From d90313a1a8b8eacc272512d442a98e644df325f5 Mon Sep 17 00:00:00 2001 From: "xing.zhao" <34002927+EricStarer@users.noreply.github.com> Date: Fri, 22 Jul 2022 22:10:28 +0800 Subject: [PATCH] Use chunkManager rather than minio.Client in datanode garbage collection (#18092) Signed-off-by: xingzhao Co-authored-by: xingzhao --- internal/datacoord/garbage_collector.go | 54 ++++++++++------- internal/datacoord/garbage_collector_test.go | 60 +++++++++---------- internal/datacoord/server.go | 55 ++++++++--------- internal/datacoord/server_test.go | 8 ++- internal/indexcoord/garbage_collector.go | 4 +- internal/indexcoord/garbage_collector_test.go | 40 ++++++------- internal/indexcoord/index_coord_mock.go | 5 +- internal/storage/local_chunk_manager.go | 50 +++++++++++++--- internal/storage/local_chunk_manager_test.go | 18 ++++-- internal/storage/minio_chunk_manager.go | 26 +++++--- internal/storage/minio_chunk_manager_test.go | 14 +++-- internal/storage/types.go | 3 +- internal/storage/vector_chunk_manager.go | 7 ++- .../util/importutil/import_wrapper_test.go | 5 +- 14 files changed, 211 insertions(+), 138 deletions(-) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 6109ddaa84..843057b511 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -17,7 +17,6 @@ package datacoord import ( - "context" "path" "sync" "time" @@ -39,12 +38,11 @@ const ( // GcOption garbage collection options type GcOption struct { - cli *minio.Client // OSS client - enabled bool // enable switch - checkInterval time.Duration // each interval - missingTolerance time.Duration // key missing in meta tolerace time - dropTolerance time.Duration // dropped segment related key tolerance time - bucketName string + cli storage.ChunkManager // client + enabled bool // enable switch + checkInterval time.Duration // each interval + missingTolerance time.Duration // key missing in meta tolerance time + dropTolerance time.Duration // dropped segment related key tolerance time rootPath string } @@ -128,20 +126,21 @@ func (gc *garbageCollector) scan() { var removedKeys []string for _, prefix := range prefixes { - for info := range gc.option.cli.ListObjects(context.TODO(), gc.option.bucketName, minio.ListObjectsOptions{ - Prefix: prefix, - Recursive: true, - }) { + infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(prefix, true) + if err != nil { + log.Error("gc listWithPrefix error", zap.String("error", err.Error())) + } + for i, infoKey := range infoKeys { total++ - _, has := filesMap[info.Key] + _, has := filesMap[infoKey] if has { valid++ continue } - segmentID, err := storage.ParseSegmentIDByBinlog(info.Key) + segmentID, err := storage.ParseSegmentIDByBinlog(infoKey) if err != nil { - log.Error("parse segment id error", zap.String("infoKey", info.Key), zap.Error(err)) + log.Error("parse segment id error", zap.String("infoKey", infoKey), zap.Error(err)) continue } if gc.segRefer.HasSegmentLock(segmentID) { @@ -150,12 +149,16 @@ func (gc *garbageCollector) scan() { } missing++ // not found in meta, check last modified time exceeds tolerance duration - if time.Since(info.LastModified) > gc.option.missingTolerance { + if err != nil { + log.Error("get modified time error", zap.String("infoKey", infoKey)) + continue + } + if time.Since(modTimes[i]) > gc.option.missingTolerance { // ignore error since it could be cleaned up next time - removedKeys = append(removedKeys, info.Key) - err = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{}) + removedKeys = append(removedKeys, infoKey) + err = gc.option.cli.Remove(infoKey) if err != nil { - log.Error("failed to remove object", zap.String("infoKey", info.Key), zap.Error(err)) + log.Error("failed to remove object", zap.String("infoKey", infoKey), zap.Error(err)) } } } @@ -204,10 +207,17 @@ func getLogs(sinfo *SegmentInfo) []*datapb.Binlog { func (gc *garbageCollector) removeLogs(logs []*datapb.Binlog) bool { delFlag := true for _, l := range logs { - err := gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, l.GetLogPath(), minio.RemoveObjectOptions{}) - errResp := minio.ToErrorResponse(err) - if errResp.Code != "" && errResp.Code != "NoSuchKey" { - delFlag = false + err := gc.option.cli.Remove(l.GetLogPath()) + if err != nil { + switch err.(type) { + case minio.ErrorResponse: + errResp := minio.ToErrorResponse(err) + if errResp.Code != "" && errResp.Code != "NoSuchKey" { + delFlag = false + } + default: + delFlag = false + } } } return delFlag diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 1ebf05b0e0..4af181e10d 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -28,6 +28,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/minio/minio-go/v7" @@ -61,7 +62,6 @@ func Test_garbageCollector_basic(t *testing.T) { checkInterval: time.Millisecond * 10, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - bucketName: bucketName, rootPath: rootPath, }) gc.start() @@ -79,7 +79,6 @@ func Test_garbageCollector_basic(t *testing.T) { checkInterval: time.Millisecond * 10, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - bucketName: bucketName, rootPath: rootPath, }) assert.NotPanics(t, func() { @@ -141,16 +140,15 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - bucketName: bucketName, rootPath: rootPath, }) gc.segRefer = segReferManager gc.scan() - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others) err = gc.segRefer.ReleaseSegmentsLock(1, 1) assert.NoError(t, err) @@ -164,15 +162,14 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - bucketName: bucketName, rootPath: rootPath, }) gc.scan() - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others) gc.close() }) @@ -191,15 +188,14 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, - bucketName: bucketName, rootPath: rootPath, }) gc.start() gc.scan() - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others) gc.close() }) @@ -221,14 +217,13 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: time.Hour * 24, dropTolerance: 0, - bucketName: bucketName, rootPath: rootPath, }) gc.clearEtcd() - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:]) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:]) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta[1:]) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:]) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:]) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta[1:]) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others) gc.close() }) @@ -239,27 +234,26 @@ func Test_garbageCollector_scan(t *testing.T) { checkInterval: time.Minute * 30, missingTolerance: 0, dropTolerance: 0, - bucketName: bucketName, rootPath: rootPath, }) gc.start() gc.scan() gc.clearEtcd() - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), []string{}) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), []string{}) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), []string{}) - validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), []string{}) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), []string{}) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), []string{}) + validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others) gc.close() }) - cleanupOSS(cli, bucketName, rootPath) + cleanupOSS(cli.Client, bucketName, rootPath) } // initialize unit test sso env -func initUtOSSEnv(bucket, root string, n int) (cli *minio.Client, inserts []string, stats []string, delta []string, other []string, err error) { +func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, inserts []string, stats []string, delta []string, other []string, err error) { Params.Init() - cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{ + cli, err := minio.New(Params.MinioCfg.Address, &minio.Options{ Creds: credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, ""), Secure: Params.MinioCfg.UseSSL, }) @@ -319,7 +313,11 @@ func initUtOSSEnv(bucket, root string, n int) (cli *minio.Client, inserts []stri } other = append(other, info.Key) } - return cli, inserts, stats, delta, other, nil + mcm = &storage.MinioChunkManager{ + Client: cli, + } + mcm.SetVar(context.TODO(), bucket) + return mcm, inserts, stats, delta, other, nil } func cleanupOSS(cli *minio.Client, bucket, root string) { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 7b1fb631a9..967b901abf 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -28,7 +28,6 @@ import ( "time" "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -42,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -353,39 +353,40 @@ func (s *Server) stopCompactionTrigger() { } func (s *Server) initGarbageCollection() error { - var cli *minio.Client + var cli storage.ChunkManager var err error - if Params.DataCoordCfg.EnableGarbageCollection { - var creds *credentials.Credentials - if Params.MinioCfg.UseIAM { - creds = credentials.NewIAM(Params.MinioCfg.IAMEndpoint) - } else { - creds = credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, "") - } - // TODO: We call minio.New in different places with same procedures to call several functions. - // We should abstract this to a focade function to avoid applying changes to only one place. - cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{ - Creds: creds, - Secure: Params.MinioCfg.UseSSL, - }) - if err != nil { - return fmt.Errorf("failed to create minio client: %v", err) - } - // retry times shall be two, just to prevent - // 1. bucket not exists - // 2. bucket is created by other componnent - // 3. datacoord try to create but failed with bucket already exists error - err = retry.Do(s.ctx, getCheckBucketFn(cli), retry.Attempts(2)) + if Params.CommonCfg.StorageType == "minio" { + chunkManagerFactory := storage.NewChunkManagerFactory("local", "minio", + storage.RootPath(Params.LocalStorageCfg.Path), + storage.Address(Params.MinioCfg.Address), + storage.AccessKeyID(Params.MinioCfg.AccessKeyID), + storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey), + storage.UseSSL(Params.MinioCfg.UseSSL), + storage.BucketName(Params.MinioCfg.BucketName), + storage.UseIAM(Params.MinioCfg.UseIAM), + storage.IAMEndpoint(Params.MinioCfg.IAMEndpoint), + storage.CreateBucket(true)) + cli, err = chunkManagerFactory.NewVectorStorageChunkManager(s.ctx) if err != nil { + log.Error("minio chunk manager init failed", zap.String("error", err.Error())) return err } + log.Info("minio chunk manager init success", zap.String("bucketname", Params.MinioCfg.BucketName)) + } else if Params.CommonCfg.StorageType == "local" { + chunkManagerFactory := storage.NewChunkManagerFactory("local", "local", + storage.RootPath(Params.LocalStorageCfg.Path)) + cli, err = chunkManagerFactory.NewVectorStorageChunkManager(s.ctx) + if err != nil { + log.Error("local chunk manager init failed", zap.String("error", err.Error())) + return err + } + log.Info("local chunk manager init success") } s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, GcOption{ - cli: cli, - enabled: Params.DataCoordCfg.EnableGarbageCollection, - bucketName: Params.MinioCfg.BucketName, - rootPath: Params.MinioCfg.RootPath, + cli: cli, + enabled: Params.DataCoordCfg.EnableGarbageCollection, + rootPath: Params.MinioCfg.RootPath, checkInterval: Params.DataCoordCfg.GCInterval, missingTolerance: Params.DataCoordCfg.GCMissingTolerance, diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 39e8816ec7..2241f5bf8b 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3008,7 +3008,7 @@ func Test_initGarbageCollection(t *testing.T) { Params.MinioCfg.Address = "host:9000:bad" err := server.initGarbageCollection() assert.Error(t, err) - assert.Contains(t, err.Error(), "failed to create minio client") + assert.Contains(t, err.Error(), "too many colons in address") }) // mock CheckBucketFn @@ -3027,6 +3027,12 @@ func Test_initGarbageCollection(t *testing.T) { t.Run("iam_ok", func(t *testing.T) { Params.MinioCfg.UseIAM = true err := server.initGarbageCollection() + assert.Error(t, err) + assert.Contains(t, err.Error(), "404 Not Found") + }) + t.Run("local storage init", func(t *testing.T) { + Params.CommonCfg.StorageType = "local" + err := server.initGarbageCollection() assert.NoError(t, err) }) } diff --git a/internal/indexcoord/garbage_collector.go b/internal/indexcoord/garbage_collector.go index a144c6ba37..df6d387360 100644 --- a/internal/indexcoord/garbage_collector.go +++ b/internal/indexcoord/garbage_collector.go @@ -109,7 +109,7 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() { case <-ticker.C: prefix := Params.IndexNodeCfg.IndexStorageRootPath + "/" // list dir first - keys, err := gc.chunkManager.ListWithPrefix(prefix, false) + keys, _, err := gc.chunkManager.ListWithPrefix(prefix, false) if err != nil { log.Error("IndexCoord garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err)) continue @@ -146,7 +146,7 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() { for _, file := range indexInfo.IndexFilePaths { filesMap[file] = true } - files, err := gc.chunkManager.ListWithPrefix(key, true) + files, _, err := gc.chunkManager.ListWithPrefix(key, true) if err != nil { log.Warn("IndexCoord garbageCollector recycleUnusedIndexFiles list files failed", zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err)) diff --git a/internal/indexcoord/garbage_collector_test.go b/internal/indexcoord/garbage_collector_test.go index 761a563688..7b2a3db2e8 100644 --- a/internal/indexcoord/garbage_collector_test.go +++ b/internal/indexcoord/garbage_collector_test.go @@ -40,8 +40,8 @@ func TestGarbageCollector_Start(t *testing.T) { removeWithPrefix: func(s string) error { return nil }, - listWithPrefix: func(s string, recursive bool) ([]string, error) { - return []string{}, nil + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { + return []string{}, []time.Time{}, nil }, remove: func(s string) error { return nil @@ -82,11 +82,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { removeWithPrefix: func(s string) error { return fmt.Errorf("error") }, - listWithPrefix: func(s string, recursive bool) ([]string, error) { + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { if !recursive { - return []string{"a/b/1/", "a/b/2/"}, nil + return []string{"a/b/1/", "a/b/2/"}, []time.Time{{}, {}}, nil } - return []string{"a/b/1/c", "a/b/2/d"}, nil + return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil }, remove: func(s string) error { return nil @@ -129,11 +129,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { removeWithPrefix: func(s string) error { return nil }, - listWithPrefix: func(s string, recursive bool) ([]string, error) { + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { if !recursive { - return nil, fmt.Errorf("error") + return nil, nil, fmt.Errorf("error") } - return []string{"a/b/1/c", "a/b/2/d"}, nil + return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil }, remove: func(s string) error { return nil @@ -176,11 +176,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { removeWithPrefix: func(s string) error { return nil }, - listWithPrefix: func(s string, recursive bool) ([]string, error) { + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { if !recursive { - return []string{"a/b/c/"}, nil + return []string{"a/b/c/"}, []time.Time{{}}, nil } - return []string{"a/b/1/c", "a/b/2/d"}, nil + return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil }, remove: func(s string) error { return nil @@ -223,11 +223,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { removeWithPrefix: func(s string) error { return nil }, - listWithPrefix: func(s string, recursive bool) ([]string, error) { + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { if !recursive { - return []string{"a/b/1/"}, nil + return []string{"a/b/1/"}, []time.Time{{}}, nil } - return nil, fmt.Errorf("error") + return nil, nil, fmt.Errorf("error") }, remove: func(s string) error { return nil @@ -270,11 +270,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { removeWithPrefix: func(s string) error { return nil }, - listWithPrefix: func(s string, recursive bool) ([]string, error) { + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { if !recursive { - return []string{"a/b/1/"}, nil + return []string{"a/b/1/"}, []time.Time{{}}, nil } - return []string{"a/b/1/c"}, nil + return []string{"a/b/1/c"}, []time.Time{{}}, nil }, remove: func(s string) error { return fmt.Errorf("error") @@ -318,11 +318,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { removeWithPrefix: func(s string) error { return nil }, - listWithPrefix: func(s string, recursive bool) ([]string, error) { + listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) { if !recursive { - return []string{"a/b/1/"}, nil + return []string{"a/b/1/"}, []time.Time{{}}, nil } - return []string{"a/b/1/c"}, nil + return []string{"a/b/1/c"}, []time.Time{{}}, nil }, remove: func(s string) error { return fmt.Errorf("error") diff --git a/internal/indexcoord/index_coord_mock.go b/internal/indexcoord/index_coord_mock.go index 9845458e29..cf83db6dd1 100644 --- a/internal/indexcoord/index_coord_mock.go +++ b/internal/indexcoord/index_coord_mock.go @@ -20,6 +20,7 @@ import ( "context" "errors" "strconv" + "time" "github.com/milvus-io/milvus/internal/kv" @@ -421,7 +422,7 @@ type chunkManagerMock struct { storage.ChunkManager removeWithPrefix func(string) error - listWithPrefix func(string, bool) ([]string, error) + listWithPrefix func(string, bool) ([]string, []time.Time, error) remove func(string) error } @@ -429,7 +430,7 @@ func (cmm *chunkManagerMock) RemoveWithPrefix(prefix string) error { return cmm.removeWithPrefix(prefix) } -func (cmm *chunkManagerMock) ListWithPrefix(prefix string, recursive bool) ([]string, error) { +func (cmm *chunkManagerMock) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) { return cmm.listWithPrefix(prefix, recursive) } diff --git a/internal/storage/local_chunk_manager.go b/internal/storage/local_chunk_manager.go index 6d8cb5bcac..98fc32a923 100644 --- a/internal/storage/local_chunk_manager.go +++ b/internal/storage/local_chunk_manager.go @@ -25,9 +25,12 @@ import ( "path" "path/filepath" "strings" + "time" + "go.uber.org/zap" "golang.org/x/exp/mmap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/errorutil" ) @@ -159,9 +162,10 @@ func (lcm *LocalChunkManager) MultiRead(filePaths []string) ([][]byte, error) { return results, el } -func (lcm *LocalChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, error) { +func (lcm *LocalChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) { + var filePaths []string + var modTimes []time.Time if recursive { - var filePaths []string absPrefix := path.Join(lcm.localPath, prefix) dir := filepath.Dir(absPrefix) err := filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error { @@ -171,16 +175,37 @@ func (lcm *LocalChunkManager) ListWithPrefix(prefix string, recursive bool) ([]s return nil }) if err != nil { - return nil, err + return nil, nil, err } - return filePaths, nil + for _, filePath := range filePaths { + modTime, err2 := lcm.getModTime(filePath) + if err2 != nil { + return filePaths, nil, err2 + } + modTimes = append(modTimes, modTime) + } + return filePaths, modTimes, nil } absPrefix := path.Join(lcm.localPath, prefix+"*") - return filepath.Glob(absPrefix) + absPaths, err := filepath.Glob(absPrefix) + if err != nil { + return nil, nil, err + } + for _, absPath := range absPaths { + filePaths = append(filePaths, strings.TrimPrefix(absPath, lcm.localPath)) + } + for _, filePath := range filePaths { + modTime, err2 := lcm.getModTime(filePath) + if err2 != nil { + return filePaths, nil, err2 + } + modTimes = append(modTimes, modTime) + } + return filePaths, modTimes, nil } func (lcm *LocalChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { - filePaths, err := lcm.ListWithPrefix(prefix, true) + filePaths, _, err := lcm.ListWithPrefix(prefix, true) if err != nil { return nil, nil, err } @@ -252,9 +277,20 @@ func (lcm *LocalChunkManager) MultiRemove(filePaths []string) error { } func (lcm *LocalChunkManager) RemoveWithPrefix(prefix string) error { - filePaths, err := lcm.ListWithPrefix(prefix, true) + filePaths, _, err := lcm.ListWithPrefix(prefix, true) if err != nil { return err } return lcm.MultiRemove(filePaths) } + +func (lcm *LocalChunkManager) getModTime(filepath string) (time.Time, error) { + absPath := path.Join(lcm.localPath, filepath) + fi, err := os.Stat(absPath) + if err != nil { + log.Error("stat fileinfo error", zap.String("relative filepath", filepath)) + return time.Time{}, err + } + + return fi.ModTime(), nil +} diff --git a/internal/storage/local_chunk_manager_test.go b/internal/storage/local_chunk_manager_test.go index b434879e53..4fbe2e54c0 100644 --- a/internal/storage/local_chunk_manager_test.go +++ b/internal/storage/local_chunk_manager_test.go @@ -392,14 +392,16 @@ func TestLocalCM(t *testing.T) { assert.NoError(t, err) pathPrefix := path.Join(testPrefix, "a") - r, err := testCM.ListWithPrefix(pathPrefix, true) + r, m, err := testCM.ListWithPrefix(pathPrefix, true) assert.NoError(t, err) assert.Equal(t, len(r), 2) + assert.Equal(t, len(m), 2) testCM.RemoveWithPrefix(testPrefix) - r, err = testCM.ListWithPrefix(pathPrefix, true) + r, m, err = testCM.ListWithPrefix(pathPrefix, true) assert.NoError(t, err) assert.Equal(t, len(r), 0) + assert.Equal(t, len(m), 0) }) t.Run("test ListWithPrefix", func(t *testing.T) { @@ -425,27 +427,31 @@ func TestLocalCM(t *testing.T) { err = testCM.Write(key, value) assert.NoError(t, err) - dirs, err := testCM.ListWithPrefix(testPrefix+"/", false) + dirs, mods, err := testCM.ListWithPrefix(testPrefix+"/", false) assert.Nil(t, err) fmt.Println(dirs) assert.Equal(t, 3, len(dirs)) + assert.Equal(t, 3, len(mods)) testPrefix2 := path.Join(testPrefix, "a") - dirs, err = testCM.ListWithPrefix(testPrefix2, false) + dirs, mods, err = testCM.ListWithPrefix(testPrefix2, false) assert.Nil(t, err) assert.Equal(t, 2, len(dirs)) + assert.Equal(t, 2, len(mods)) - dirs, err = testCM.ListWithPrefix(testPrefix2, false) + dirs, mods, err = testCM.ListWithPrefix(testPrefix2, false) assert.Nil(t, err) assert.Equal(t, 2, len(dirs)) + assert.Equal(t, 2, len(mods)) err = testCM.RemoveWithPrefix(testPrefix) assert.NoError(t, err) - dirs, err = testCM.ListWithPrefix(testPrefix, false) + dirs, mods, err = testCM.ListWithPrefix(testPrefix, false) assert.NoError(t, err) fmt.Println(dirs) // dir still exist assert.Equal(t, 1, len(dirs)) + assert.Equal(t, 1, len(mods)) }) } diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index aa16675a5c..5760c581e4 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -23,15 +23,15 @@ import ( "fmt" "io" "io/ioutil" - - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "go.uber.org/zap" - "golang.org/x/exp/mmap" + "time" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/errorutil" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "go.uber.org/zap" + "golang.org/x/exp/mmap" ) // MinioChunkManager is responsible for read and write data stored in minio. @@ -106,6 +106,12 @@ func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunk return mcm, nil } +// SetVar set the variable value of mcm +func (mcm *MinioChunkManager) SetVar(ctx context.Context, bucketName string) { + mcm.ctx = ctx + mcm.bucketName = bucketName +} + // Path returns the path of minio data if exists. func (mcm *MinioChunkManager) Path(filePath string) (string, error) { exist, err := mcm.Exist(filePath) @@ -219,7 +225,7 @@ func (mcm *MinioChunkManager) MultiRead(keys []string) ([][]byte, error) { } func (mcm *MinioChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { - objectsKeys, err := mcm.ListWithPrefix(prefix, true) + objectsKeys, _, err := mcm.ListWithPrefix(prefix, true) if err != nil { return nil, nil, err } @@ -299,16 +305,18 @@ func (mcm *MinioChunkManager) RemoveWithPrefix(prefix string) error { return nil } -func (mcm *MinioChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, error) { +func (mcm *MinioChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) { objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: recursive}) var objectsKeys []string + var modTimes []time.Time for object := range objects { if object.Err != nil { log.Warn("failed to list with prefix", zap.String("prefix", prefix), zap.Error(object.Err)) - return nil, object.Err + return nil, nil, object.Err } objectsKeys = append(objectsKeys, object.Key) + modTimes = append(modTimes, object.LastModified) } - return objectsKeys, nil + return objectsKeys, modTimes, nil } diff --git a/internal/storage/minio_chunk_manager_test.go b/internal/storage/minio_chunk_manager_test.go index 54be783afb..0409a984ca 100644 --- a/internal/storage/minio_chunk_manager_test.go +++ b/internal/storage/minio_chunk_manager_test.go @@ -453,9 +453,10 @@ func TestMinIOCM(t *testing.T) { assert.NoError(t, err) pathPrefix := path.Join(testPrefix, "a") - r, err := testCM.ListWithPrefix(pathPrefix, true) + r, m, err := testCM.ListWithPrefix(pathPrefix, true) assert.NoError(t, err) assert.Equal(t, len(r), 2) + assert.Equal(t, len(m), 2) key = path.Join(testPrefix, "b", "b", "b") err = testCM.Write(key, value) @@ -468,23 +469,26 @@ func TestMinIOCM(t *testing.T) { key = path.Join(testPrefix, "bc", "a", "b") err = testCM.Write(key, value) assert.NoError(t, err) - dirs, err := testCM.ListWithPrefix(testPrefix+"/", false) + dirs, mods, err := testCM.ListWithPrefix(testPrefix+"/", false) assert.NoError(t, err) assert.Equal(t, 3, len(dirs)) + assert.Equal(t, 3, len(mods)) - dirs, err = testCM.ListWithPrefix(path.Join(testPrefix, "b"), false) + dirs, mods, err = testCM.ListWithPrefix(path.Join(testPrefix, "b"), false) assert.NoError(t, err) assert.Equal(t, 2, len(dirs)) + assert.Equal(t, 2, len(mods)) testCM.RemoveWithPrefix(testPrefix) - r, err = testCM.ListWithPrefix(pathPrefix, false) + r, m, err = testCM.ListWithPrefix(pathPrefix, false) assert.NoError(t, err) assert.Equal(t, len(r), 0) + assert.Equal(t, len(m), 0) // test wrong prefix b := make([]byte, 2048) pathWrong := path.Join(testPrefix, string(b)) - _, err = testCM.ListWithPrefix(pathWrong, true) + _, _, err = testCM.ListWithPrefix(pathWrong, true) assert.Error(t, err) }) } diff --git a/internal/storage/types.go b/internal/storage/types.go index aefc9bfcb8..b8b34c7190 100644 --- a/internal/storage/types.go +++ b/internal/storage/types.go @@ -13,6 +13,7 @@ package storage import ( "io" + "time" "golang.org/x/exp/mmap" ) @@ -41,7 +42,7 @@ type ChunkManager interface { Reader(filePath string) (FileReader, error) // MultiRead reads @filePath and returns content. MultiRead(filePaths []string) ([][]byte, error) - ListWithPrefix(prefix string, recursive bool) ([]string, error) + ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) // ReadWithPrefix reads files with same @prefix and returns contents. ReadWithPrefix(prefix string) ([]string, [][]byte, error) Mmap(filePath string) (*mmap.ReaderAt, error) diff --git a/internal/storage/vector_chunk_manager.go b/internal/storage/vector_chunk_manager.go index dbbcab3cd4..23adc0e3d7 100644 --- a/internal/storage/vector_chunk_manager.go +++ b/internal/storage/vector_chunk_manager.go @@ -20,6 +20,7 @@ import ( "errors" "io" "sync" + "time" "go.uber.org/zap" "golang.org/x/exp/mmap" @@ -216,7 +217,7 @@ func (vcm *VectorChunkManager) MultiRead(filePaths []string) ([][]byte, error) { } func (vcm *VectorChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { - filePaths, err := vcm.ListWithPrefix(prefix, true) + filePaths, _, err := vcm.ListWithPrefix(prefix, true) if err != nil { return nil, nil, err } @@ -227,7 +228,7 @@ func (vcm *VectorChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte return filePaths, results, nil } -func (vcm *VectorChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, error) { +func (vcm *VectorChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) { return vcm.vectorStorage.ListWithPrefix(prefix, recursive) } @@ -312,7 +313,7 @@ func (vcm *VectorChunkManager) RemoveWithPrefix(prefix string) error { return err } if vcm.cacheEnable { - filePaths, err := vcm.ListWithPrefix(prefix, true) + filePaths, _, err := vcm.ListWithPrefix(prefix, true) if err != nil { return err } diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index 8359dd18d0..e6d29f34f3 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "strconv" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -58,8 +59,8 @@ func (mc *MockChunkManager) MultiRead(filePaths []string) ([][]byte, error) { return nil, nil } -func (mc *MockChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, error) { - return nil, nil +func (mc *MockChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) { + return nil, nil, nil } func (mc *MockChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {