Use chunkManager rather than minio.Client in datanode garbage collection (#18092)

Signed-off-by: xingzhao <xing.zhao@zilliz.com>

Co-authored-by: xingzhao <xing.zhao@zilliz.com>
This commit is contained in:
xing.zhao 2022-07-22 22:10:28 +08:00 committed by GitHub
parent 8a0ae2f30a
commit d90313a1a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 211 additions and 138 deletions

View File

@ -17,7 +17,6 @@
package datacoord
import (
"context"
"path"
"sync"
"time"
@ -39,12 +38,11 @@ const (
// GcOption garbage collection options
type GcOption struct {
cli *minio.Client // OSS client
enabled bool // enable switch
checkInterval time.Duration // each interval
missingTolerance time.Duration // key missing in meta tolerace time
dropTolerance time.Duration // dropped segment related key tolerance time
bucketName string
cli storage.ChunkManager // client
enabled bool // enable switch
checkInterval time.Duration // each interval
missingTolerance time.Duration // key missing in meta tolerance time
dropTolerance time.Duration // dropped segment related key tolerance time
rootPath string
}
@ -128,20 +126,21 @@ func (gc *garbageCollector) scan() {
var removedKeys []string
for _, prefix := range prefixes {
for info := range gc.option.cli.ListObjects(context.TODO(), gc.option.bucketName, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: true,
}) {
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(prefix, true)
if err != nil {
log.Error("gc listWithPrefix error", zap.String("error", err.Error()))
}
for i, infoKey := range infoKeys {
total++
_, has := filesMap[info.Key]
_, has := filesMap[infoKey]
if has {
valid++
continue
}
segmentID, err := storage.ParseSegmentIDByBinlog(info.Key)
segmentID, err := storage.ParseSegmentIDByBinlog(infoKey)
if err != nil {
log.Error("parse segment id error", zap.String("infoKey", info.Key), zap.Error(err))
log.Error("parse segment id error", zap.String("infoKey", infoKey), zap.Error(err))
continue
}
if gc.segRefer.HasSegmentLock(segmentID) {
@ -150,12 +149,16 @@ func (gc *garbageCollector) scan() {
}
missing++
// not found in meta, check last modified time exceeds tolerance duration
if time.Since(info.LastModified) > gc.option.missingTolerance {
if err != nil {
log.Error("get modified time error", zap.String("infoKey", infoKey))
continue
}
if time.Since(modTimes[i]) > gc.option.missingTolerance {
// ignore error since it could be cleaned up next time
removedKeys = append(removedKeys, info.Key)
err = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{})
removedKeys = append(removedKeys, infoKey)
err = gc.option.cli.Remove(infoKey)
if err != nil {
log.Error("failed to remove object", zap.String("infoKey", info.Key), zap.Error(err))
log.Error("failed to remove object", zap.String("infoKey", infoKey), zap.Error(err))
}
}
}
@ -204,10 +207,17 @@ func getLogs(sinfo *SegmentInfo) []*datapb.Binlog {
func (gc *garbageCollector) removeLogs(logs []*datapb.Binlog) bool {
delFlag := true
for _, l := range logs {
err := gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, l.GetLogPath(), minio.RemoveObjectOptions{})
errResp := minio.ToErrorResponse(err)
if errResp.Code != "" && errResp.Code != "NoSuchKey" {
delFlag = false
err := gc.option.cli.Remove(l.GetLogPath())
if err != nil {
switch err.(type) {
case minio.ErrorResponse:
errResp := minio.ToErrorResponse(err)
if errResp.Code != "" && errResp.Code != "NoSuchKey" {
delFlag = false
}
default:
delFlag = false
}
}
}
return delFlag

View File

@ -28,6 +28,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/minio/minio-go/v7"
@ -61,7 +62,6 @@ func Test_garbageCollector_basic(t *testing.T) {
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
bucketName: bucketName,
rootPath: rootPath,
})
gc.start()
@ -79,7 +79,6 @@ func Test_garbageCollector_basic(t *testing.T) {
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
bucketName: bucketName,
rootPath: rootPath,
})
assert.NotPanics(t, func() {
@ -141,16 +140,15 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
bucketName: bucketName,
rootPath: rootPath,
})
gc.segRefer = segReferManager
gc.scan()
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
err = gc.segRefer.ReleaseSegmentsLock(1, 1)
assert.NoError(t, err)
@ -164,15 +162,14 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
bucketName: bucketName,
rootPath: rootPath,
})
gc.scan()
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
@ -191,15 +188,14 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
bucketName: bucketName,
rootPath: rootPath,
})
gc.start()
gc.scan()
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
@ -221,14 +217,13 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: 0,
bucketName: bucketName,
rootPath: rootPath,
})
gc.clearEtcd()
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
@ -239,27 +234,26 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: 0,
dropTolerance: 0,
bucketName: bucketName,
rootPath: rootPath,
})
gc.start()
gc.scan()
gc.clearEtcd()
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), []string{})
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), []string{})
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), []string{})
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), []string{})
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), []string{})
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), []string{})
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
cleanupOSS(cli, bucketName, rootPath)
cleanupOSS(cli.Client, bucketName, rootPath)
}
// initialize unit test sso env
func initUtOSSEnv(bucket, root string, n int) (cli *minio.Client, inserts []string, stats []string, delta []string, other []string, err error) {
func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, inserts []string, stats []string, delta []string, other []string, err error) {
Params.Init()
cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{
cli, err := minio.New(Params.MinioCfg.Address, &minio.Options{
Creds: credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, ""),
Secure: Params.MinioCfg.UseSSL,
})
@ -319,7 +313,11 @@ func initUtOSSEnv(bucket, root string, n int) (cli *minio.Client, inserts []stri
}
other = append(other, info.Key)
}
return cli, inserts, stats, delta, other, nil
mcm = &storage.MinioChunkManager{
Client: cli,
}
mcm.SetVar(context.TODO(), bucket)
return mcm, inserts, stats, delta, other, nil
}
func cleanupOSS(cli *minio.Client, bucket, root string) {

View File

@ -28,7 +28,6 @@ import (
"time"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
@ -42,6 +41,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/funcutil"
@ -353,39 +353,40 @@ func (s *Server) stopCompactionTrigger() {
}
func (s *Server) initGarbageCollection() error {
var cli *minio.Client
var cli storage.ChunkManager
var err error
if Params.DataCoordCfg.EnableGarbageCollection {
var creds *credentials.Credentials
if Params.MinioCfg.UseIAM {
creds = credentials.NewIAM(Params.MinioCfg.IAMEndpoint)
} else {
creds = credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, "")
}
// TODO: We call minio.New in different places with same procedures to call several functions.
// We should abstract this to a focade function to avoid applying changes to only one place.
cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{
Creds: creds,
Secure: Params.MinioCfg.UseSSL,
})
if err != nil {
return fmt.Errorf("failed to create minio client: %v", err)
}
// retry times shall be two, just to prevent
// 1. bucket not exists
// 2. bucket is created by other componnent
// 3. datacoord try to create but failed with bucket already exists error
err = retry.Do(s.ctx, getCheckBucketFn(cli), retry.Attempts(2))
if Params.CommonCfg.StorageType == "minio" {
chunkManagerFactory := storage.NewChunkManagerFactory("local", "minio",
storage.RootPath(Params.LocalStorageCfg.Path),
storage.Address(Params.MinioCfg.Address),
storage.AccessKeyID(Params.MinioCfg.AccessKeyID),
storage.SecretAccessKeyID(Params.MinioCfg.SecretAccessKey),
storage.UseSSL(Params.MinioCfg.UseSSL),
storage.BucketName(Params.MinioCfg.BucketName),
storage.UseIAM(Params.MinioCfg.UseIAM),
storage.IAMEndpoint(Params.MinioCfg.IAMEndpoint),
storage.CreateBucket(true))
cli, err = chunkManagerFactory.NewVectorStorageChunkManager(s.ctx)
if err != nil {
log.Error("minio chunk manager init failed", zap.String("error", err.Error()))
return err
}
log.Info("minio chunk manager init success", zap.String("bucketname", Params.MinioCfg.BucketName))
} else if Params.CommonCfg.StorageType == "local" {
chunkManagerFactory := storage.NewChunkManagerFactory("local", "local",
storage.RootPath(Params.LocalStorageCfg.Path))
cli, err = chunkManagerFactory.NewVectorStorageChunkManager(s.ctx)
if err != nil {
log.Error("local chunk manager init failed", zap.String("error", err.Error()))
return err
}
log.Info("local chunk manager init success")
}
s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, GcOption{
cli: cli,
enabled: Params.DataCoordCfg.EnableGarbageCollection,
bucketName: Params.MinioCfg.BucketName,
rootPath: Params.MinioCfg.RootPath,
cli: cli,
enabled: Params.DataCoordCfg.EnableGarbageCollection,
rootPath: Params.MinioCfg.RootPath,
checkInterval: Params.DataCoordCfg.GCInterval,
missingTolerance: Params.DataCoordCfg.GCMissingTolerance,

View File

@ -3008,7 +3008,7 @@ func Test_initGarbageCollection(t *testing.T) {
Params.MinioCfg.Address = "host:9000:bad"
err := server.initGarbageCollection()
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to create minio client")
assert.Contains(t, err.Error(), "too many colons in address")
})
// mock CheckBucketFn
@ -3027,6 +3027,12 @@ func Test_initGarbageCollection(t *testing.T) {
t.Run("iam_ok", func(t *testing.T) {
Params.MinioCfg.UseIAM = true
err := server.initGarbageCollection()
assert.Error(t, err)
assert.Contains(t, err.Error(), "404 Not Found")
})
t.Run("local storage init", func(t *testing.T) {
Params.CommonCfg.StorageType = "local"
err := server.initGarbageCollection()
assert.NoError(t, err)
})
}

View File

@ -109,7 +109,7 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
case <-ticker.C:
prefix := Params.IndexNodeCfg.IndexStorageRootPath + "/"
// list dir first
keys, err := gc.chunkManager.ListWithPrefix(prefix, false)
keys, _, err := gc.chunkManager.ListWithPrefix(prefix, false)
if err != nil {
log.Error("IndexCoord garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err))
continue
@ -146,7 +146,7 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
for _, file := range indexInfo.IndexFilePaths {
filesMap[file] = true
}
files, err := gc.chunkManager.ListWithPrefix(key, true)
files, _, err := gc.chunkManager.ListWithPrefix(key, true)
if err != nil {
log.Warn("IndexCoord garbageCollector recycleUnusedIndexFiles list files failed",
zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err))

View File

@ -40,8 +40,8 @@ func TestGarbageCollector_Start(t *testing.T) {
removeWithPrefix: func(s string) error {
return nil
},
listWithPrefix: func(s string, recursive bool) ([]string, error) {
return []string{}, nil
listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) {
return []string{}, []time.Time{}, nil
},
remove: func(s string) error {
return nil
@ -82,11 +82,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
removeWithPrefix: func(s string) error {
return fmt.Errorf("error")
},
listWithPrefix: func(s string, recursive bool) ([]string, error) {
listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) {
if !recursive {
return []string{"a/b/1/", "a/b/2/"}, nil
return []string{"a/b/1/", "a/b/2/"}, []time.Time{{}, {}}, nil
}
return []string{"a/b/1/c", "a/b/2/d"}, nil
return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil
},
remove: func(s string) error {
return nil
@ -129,11 +129,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
removeWithPrefix: func(s string) error {
return nil
},
listWithPrefix: func(s string, recursive bool) ([]string, error) {
listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) {
if !recursive {
return nil, fmt.Errorf("error")
return nil, nil, fmt.Errorf("error")
}
return []string{"a/b/1/c", "a/b/2/d"}, nil
return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil
},
remove: func(s string) error {
return nil
@ -176,11 +176,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
removeWithPrefix: func(s string) error {
return nil
},
listWithPrefix: func(s string, recursive bool) ([]string, error) {
listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) {
if !recursive {
return []string{"a/b/c/"}, nil
return []string{"a/b/c/"}, []time.Time{{}}, nil
}
return []string{"a/b/1/c", "a/b/2/d"}, nil
return []string{"a/b/1/c", "a/b/2/d"}, []time.Time{{}, {}}, nil
},
remove: func(s string) error {
return nil
@ -223,11 +223,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
removeWithPrefix: func(s string) error {
return nil
},
listWithPrefix: func(s string, recursive bool) ([]string, error) {
listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) {
if !recursive {
return []string{"a/b/1/"}, nil
return []string{"a/b/1/"}, []time.Time{{}}, nil
}
return nil, fmt.Errorf("error")
return nil, nil, fmt.Errorf("error")
},
remove: func(s string) error {
return nil
@ -270,11 +270,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
removeWithPrefix: func(s string) error {
return nil
},
listWithPrefix: func(s string, recursive bool) ([]string, error) {
listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) {
if !recursive {
return []string{"a/b/1/"}, nil
return []string{"a/b/1/"}, []time.Time{{}}, nil
}
return []string{"a/b/1/c"}, nil
return []string{"a/b/1/c"}, []time.Time{{}}, nil
},
remove: func(s string) error {
return fmt.Errorf("error")
@ -318,11 +318,11 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
removeWithPrefix: func(s string) error {
return nil
},
listWithPrefix: func(s string, recursive bool) ([]string, error) {
listWithPrefix: func(s string, recursive bool) ([]string, []time.Time, error) {
if !recursive {
return []string{"a/b/1/"}, nil
return []string{"a/b/1/"}, []time.Time{{}}, nil
}
return []string{"a/b/1/c"}, nil
return []string{"a/b/1/c"}, []time.Time{{}}, nil
},
remove: func(s string) error {
return fmt.Errorf("error")

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"strconv"
"time"
"github.com/milvus-io/milvus/internal/kv"
@ -421,7 +422,7 @@ type chunkManagerMock struct {
storage.ChunkManager
removeWithPrefix func(string) error
listWithPrefix func(string, bool) ([]string, error)
listWithPrefix func(string, bool) ([]string, []time.Time, error)
remove func(string) error
}
@ -429,7 +430,7 @@ func (cmm *chunkManagerMock) RemoveWithPrefix(prefix string) error {
return cmm.removeWithPrefix(prefix)
}
func (cmm *chunkManagerMock) ListWithPrefix(prefix string, recursive bool) ([]string, error) {
func (cmm *chunkManagerMock) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) {
return cmm.listWithPrefix(prefix, recursive)
}

View File

@ -25,9 +25,12 @@ import (
"path"
"path/filepath"
"strings"
"time"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/errorutil"
)
@ -159,9 +162,10 @@ func (lcm *LocalChunkManager) MultiRead(filePaths []string) ([][]byte, error) {
return results, el
}
func (lcm *LocalChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, error) {
func (lcm *LocalChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) {
var filePaths []string
var modTimes []time.Time
if recursive {
var filePaths []string
absPrefix := path.Join(lcm.localPath, prefix)
dir := filepath.Dir(absPrefix)
err := filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error {
@ -171,16 +175,37 @@ func (lcm *LocalChunkManager) ListWithPrefix(prefix string, recursive bool) ([]s
return nil
})
if err != nil {
return nil, err
return nil, nil, err
}
return filePaths, nil
for _, filePath := range filePaths {
modTime, err2 := lcm.getModTime(filePath)
if err2 != nil {
return filePaths, nil, err2
}
modTimes = append(modTimes, modTime)
}
return filePaths, modTimes, nil
}
absPrefix := path.Join(lcm.localPath, prefix+"*")
return filepath.Glob(absPrefix)
absPaths, err := filepath.Glob(absPrefix)
if err != nil {
return nil, nil, err
}
for _, absPath := range absPaths {
filePaths = append(filePaths, strings.TrimPrefix(absPath, lcm.localPath))
}
for _, filePath := range filePaths {
modTime, err2 := lcm.getModTime(filePath)
if err2 != nil {
return filePaths, nil, err2
}
modTimes = append(modTimes, modTime)
}
return filePaths, modTimes, nil
}
func (lcm *LocalChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {
filePaths, err := lcm.ListWithPrefix(prefix, true)
filePaths, _, err := lcm.ListWithPrefix(prefix, true)
if err != nil {
return nil, nil, err
}
@ -252,9 +277,20 @@ func (lcm *LocalChunkManager) MultiRemove(filePaths []string) error {
}
func (lcm *LocalChunkManager) RemoveWithPrefix(prefix string) error {
filePaths, err := lcm.ListWithPrefix(prefix, true)
filePaths, _, err := lcm.ListWithPrefix(prefix, true)
if err != nil {
return err
}
return lcm.MultiRemove(filePaths)
}
func (lcm *LocalChunkManager) getModTime(filepath string) (time.Time, error) {
absPath := path.Join(lcm.localPath, filepath)
fi, err := os.Stat(absPath)
if err != nil {
log.Error("stat fileinfo error", zap.String("relative filepath", filepath))
return time.Time{}, err
}
return fi.ModTime(), nil
}

View File

@ -392,14 +392,16 @@ func TestLocalCM(t *testing.T) {
assert.NoError(t, err)
pathPrefix := path.Join(testPrefix, "a")
r, err := testCM.ListWithPrefix(pathPrefix, true)
r, m, err := testCM.ListWithPrefix(pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, len(r), 2)
assert.Equal(t, len(m), 2)
testCM.RemoveWithPrefix(testPrefix)
r, err = testCM.ListWithPrefix(pathPrefix, true)
r, m, err = testCM.ListWithPrefix(pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, len(r), 0)
assert.Equal(t, len(m), 0)
})
t.Run("test ListWithPrefix", func(t *testing.T) {
@ -425,27 +427,31 @@ func TestLocalCM(t *testing.T) {
err = testCM.Write(key, value)
assert.NoError(t, err)
dirs, err := testCM.ListWithPrefix(testPrefix+"/", false)
dirs, mods, err := testCM.ListWithPrefix(testPrefix+"/", false)
assert.Nil(t, err)
fmt.Println(dirs)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
testPrefix2 := path.Join(testPrefix, "a")
dirs, err = testCM.ListWithPrefix(testPrefix2, false)
dirs, mods, err = testCM.ListWithPrefix(testPrefix2, false)
assert.Nil(t, err)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(mods))
dirs, err = testCM.ListWithPrefix(testPrefix2, false)
dirs, mods, err = testCM.ListWithPrefix(testPrefix2, false)
assert.Nil(t, err)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(mods))
err = testCM.RemoveWithPrefix(testPrefix)
assert.NoError(t, err)
dirs, err = testCM.ListWithPrefix(testPrefix, false)
dirs, mods, err = testCM.ListWithPrefix(testPrefix, false)
assert.NoError(t, err)
fmt.Println(dirs)
// dir still exist
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
})
}

View File

@ -23,15 +23,15 @@ import (
"fmt"
"io"
"io/ioutil"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/errorutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
)
// MinioChunkManager is responsible for read and write data stored in minio.
@ -106,6 +106,12 @@ func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunk
return mcm, nil
}
// SetVar set the variable value of mcm
func (mcm *MinioChunkManager) SetVar(ctx context.Context, bucketName string) {
mcm.ctx = ctx
mcm.bucketName = bucketName
}
// Path returns the path of minio data if exists.
func (mcm *MinioChunkManager) Path(filePath string) (string, error) {
exist, err := mcm.Exist(filePath)
@ -219,7 +225,7 @@ func (mcm *MinioChunkManager) MultiRead(keys []string) ([][]byte, error) {
}
func (mcm *MinioChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {
objectsKeys, err := mcm.ListWithPrefix(prefix, true)
objectsKeys, _, err := mcm.ListWithPrefix(prefix, true)
if err != nil {
return nil, nil, err
}
@ -299,16 +305,18 @@ func (mcm *MinioChunkManager) RemoveWithPrefix(prefix string) error {
return nil
}
func (mcm *MinioChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, error) {
func (mcm *MinioChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) {
objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: recursive})
var objectsKeys []string
var modTimes []time.Time
for object := range objects {
if object.Err != nil {
log.Warn("failed to list with prefix", zap.String("prefix", prefix), zap.Error(object.Err))
return nil, object.Err
return nil, nil, object.Err
}
objectsKeys = append(objectsKeys, object.Key)
modTimes = append(modTimes, object.LastModified)
}
return objectsKeys, nil
return objectsKeys, modTimes, nil
}

View File

@ -453,9 +453,10 @@ func TestMinIOCM(t *testing.T) {
assert.NoError(t, err)
pathPrefix := path.Join(testPrefix, "a")
r, err := testCM.ListWithPrefix(pathPrefix, true)
r, m, err := testCM.ListWithPrefix(pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, len(r), 2)
assert.Equal(t, len(m), 2)
key = path.Join(testPrefix, "b", "b", "b")
err = testCM.Write(key, value)
@ -468,23 +469,26 @@ func TestMinIOCM(t *testing.T) {
key = path.Join(testPrefix, "bc", "a", "b")
err = testCM.Write(key, value)
assert.NoError(t, err)
dirs, err := testCM.ListWithPrefix(testPrefix+"/", false)
dirs, mods, err := testCM.ListWithPrefix(testPrefix+"/", false)
assert.NoError(t, err)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
dirs, err = testCM.ListWithPrefix(path.Join(testPrefix, "b"), false)
dirs, mods, err = testCM.ListWithPrefix(path.Join(testPrefix, "b"), false)
assert.NoError(t, err)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(mods))
testCM.RemoveWithPrefix(testPrefix)
r, err = testCM.ListWithPrefix(pathPrefix, false)
r, m, err = testCM.ListWithPrefix(pathPrefix, false)
assert.NoError(t, err)
assert.Equal(t, len(r), 0)
assert.Equal(t, len(m), 0)
// test wrong prefix
b := make([]byte, 2048)
pathWrong := path.Join(testPrefix, string(b))
_, err = testCM.ListWithPrefix(pathWrong, true)
_, _, err = testCM.ListWithPrefix(pathWrong, true)
assert.Error(t, err)
})
}

View File

@ -13,6 +13,7 @@ package storage
import (
"io"
"time"
"golang.org/x/exp/mmap"
)
@ -41,7 +42,7 @@ type ChunkManager interface {
Reader(filePath string) (FileReader, error)
// MultiRead reads @filePath and returns content.
MultiRead(filePaths []string) ([][]byte, error)
ListWithPrefix(prefix string, recursive bool) ([]string, error)
ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error)
// ReadWithPrefix reads files with same @prefix and returns contents.
ReadWithPrefix(prefix string) ([]string, [][]byte, error)
Mmap(filePath string) (*mmap.ReaderAt, error)

View File

@ -20,6 +20,7 @@ import (
"errors"
"io"
"sync"
"time"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
@ -216,7 +217,7 @@ func (vcm *VectorChunkManager) MultiRead(filePaths []string) ([][]byte, error) {
}
func (vcm *VectorChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {
filePaths, err := vcm.ListWithPrefix(prefix, true)
filePaths, _, err := vcm.ListWithPrefix(prefix, true)
if err != nil {
return nil, nil, err
}
@ -227,7 +228,7 @@ func (vcm *VectorChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte
return filePaths, results, nil
}
func (vcm *VectorChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, error) {
func (vcm *VectorChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) {
return vcm.vectorStorage.ListWithPrefix(prefix, recursive)
}
@ -312,7 +313,7 @@ func (vcm *VectorChunkManager) RemoveWithPrefix(prefix string) error {
return err
}
if vcm.cacheEnable {
filePaths, err := vcm.ListWithPrefix(prefix, true)
filePaths, _, err := vcm.ListWithPrefix(prefix, true)
if err != nil {
return err
}

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
@ -58,8 +59,8 @@ func (mc *MockChunkManager) MultiRead(filePaths []string) ([][]byte, error) {
return nil, nil
}
func (mc *MockChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, error) {
return nil, nil
func (mc *MockChunkManager) ListWithPrefix(prefix string, recursive bool) ([]string, []time.Time, error) {
return nil, nil, nil
}
func (mc *MockChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) {