enhance: use WalkWithPrefix api for oss, enable piplined file gc (#31740)

issue: #19095,#29655,#31718

- Change `ListWithPrefix` to `WalkWithPrefix` of OOS into a pipeline
mode.

- File garbage collection is performed in other goroutine.

- Segment Index Recycle clean index file too.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
chyezh 2024-04-25 20:41:27 +08:00 committed by GitHub
parent f06509bf97
commit 2586c2f1b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1012 additions and 1808 deletions

View File

@ -102,6 +102,7 @@ minio:
region: # Specify minio storage system location region
useVirtualHost: false # Whether use virtual host mode for bucket
requestTimeoutMs: 10000 # minio timeout for request time in milliseconds
listObjectsMaxKeys: 0 # The maximum number of objects requested per batch in minio ListObjects rpc, 0 means using oss client by default, decrease these configration if ListObjects timeout
# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
# You can change your mq by setting mq.type field.
@ -443,7 +444,7 @@ dataCoord:
enableGarbageCollection: true
gc:
interval: 3600 # gc interval in seconds
missingTolerance: 3600 # file meta missing tolerance duration in seconds, default to 1hr
missingTolerance: 86400 # file meta missing tolerance duration in seconds, default to 24hr(1d)
dropTolerance: 10800 # file belongs to dropped entity tolerance duration in seconds. 3600
removeConcurrent: 32 # number of concurrent goroutines to remove dropped s3 objects
scanInterval: 168 # garbage collection scan residue interval in hours

View File

@ -20,18 +20,17 @@ import (
"context"
"fmt"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/minio/minio-go/v7"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
@ -53,12 +52,15 @@ type GcOption struct {
dropTolerance time.Duration // dropped segment related key tolerance time
scanInterval time.Duration // interval for scan residue for interupted log wrttien
removeLogPool *conc.Pool[struct{}]
removeObjectPool *conc.Pool[struct{}]
}
// garbageCollector handles garbage files in object storage
// which could be dropped collection remanent or data node failure traces
type garbageCollector struct {
ctx context.Context
cancel context.CancelFunc
option GcOption
meta *meta
handler Handler
@ -66,7 +68,6 @@ type garbageCollector struct {
startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
closeCh chan struct{}
cmdCh chan gcCmd
pauseUntil atomic.Time
}
@ -84,12 +85,14 @@ func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageColl
zap.Duration("scanInterval", opt.scanInterval),
zap.Duration("missingTolerance", opt.missingTolerance),
zap.Duration("dropTolerance", opt.dropTolerance))
opt.removeLogPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
ctx, cancel := context.WithCancel(context.Background())
return &garbageCollector{
ctx: ctx,
cancel: cancel,
meta: meta,
handler: handler,
option: opt,
closeCh: make(chan struct{}),
cmdCh: make(chan gcCmd),
}
}
@ -102,8 +105,7 @@ func (gc *garbageCollector) start() {
return
}
gc.startOnce.Do(func() {
gc.wg.Add(1)
go gc.work()
gc.work(gc.ctx)
})
}
}
@ -146,26 +148,35 @@ func (gc *garbageCollector) Resume(ctx context.Context) error {
}
// work contains actual looping check logic
func (gc *garbageCollector) work() {
defer gc.wg.Done()
ticker := time.NewTicker(gc.option.checkInterval)
defer ticker.Stop()
scanTicker := time.NewTicker(gc.option.scanInterval)
defer scanTicker.Stop()
func (gc *garbageCollector) work(ctx context.Context) {
// TODO: fast cancel for gc when closing.
// Run gc tasks in parallel.
gc.wg.Add(3)
go func() {
defer gc.wg.Done()
gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) {
gc.recycleDroppedSegments(ctx)
gc.recycleUnusedIndexes(ctx)
gc.recycleUnusedSegIndexes(ctx)
})
}()
go func() {
defer gc.wg.Done()
gc.runRecycleTaskWithPauser(ctx, "orphan", gc.option.scanInterval, func(ctx context.Context) {
gc.recycleUnusedBinlogFiles(ctx)
gc.recycleUnusedIndexFiles(ctx)
})
}()
go func() {
defer gc.wg.Done()
gc.startControlLoop(ctx)
}()
}
// startControlLoop start a control loop for garbageCollector.
func (gc *garbageCollector) startControlLoop(ctx context.Context) {
for {
select {
case <-ticker.C:
if time.Now().Before(gc.pauseUntil.Load()) {
log.Info("garbage collector paused", zap.Time("until", gc.pauseUntil.Load()))
continue
}
gc.clearEtcd()
gc.recycleUnusedIndexes()
gc.recycleUnusedSegIndexes()
gc.recycleUnusedIndexFiles()
case <-scanTicker.C:
log.Info("Garbage collector start to scan interrupted write residue")
gc.scan()
case cmd := <-gc.cmdCh:
switch cmd.cmdType {
case datapb.GcCommand_Pause:
@ -182,112 +193,175 @@ func (gc *garbageCollector) work() {
log.Info("garbage collection resumed")
}
close(cmd.done)
case <-gc.closeCh:
log.Warn("garbage collector quit")
case <-gc.ctx.Done():
log.Warn("garbage collector control loop quit")
return
}
}
}
// runRecycleTaskWithPauser is a helper function to create a task with pauser
func (gc *garbageCollector) runRecycleTaskWithPauser(ctx context.Context, name string, interval time.Duration, task func(ctx context.Context)) {
logger := log.With(zap.String("gcType", name)).With(zap.Duration("interval", interval))
timer := time.NewTimer(interval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if time.Now().Before(gc.pauseUntil.Load()) {
logger.Info("garbage collector paused", zap.Time("until", gc.pauseUntil.Load()))
continue
}
logger.Info("garbage collector recycle task start...")
start := time.Now()
task(ctx)
logger.Info("garbage collector recycle task done", zap.Duration("timeCost", time.Since(start)))
}
}
}
// close stop the garbage collector.
func (gc *garbageCollector) close() {
gc.stopOnce.Do(func() {
close(gc.closeCh)
gc.cancel()
gc.wg.Wait()
})
}
// scan load meta file info and compares OSS keys
// recycleUnusedBinlogFiles load meta file info and compares OSS keys
// if missing found, performs gc cleanup
func (gc *garbageCollector) scan() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func (gc *garbageCollector) recycleUnusedBinlogFiles(ctx context.Context) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleUnusedBinlogFiles"), zap.Time("startAt", start))
log.Info("start recycleUnusedBinlogFiles...")
defer func() { log.Info("recycleUnusedBinlogFiles done", zap.Duration("timeCost", time.Since(start))) }()
var (
total = 0
valid = 0
missing = 0
)
getMetaMap := func() (typeutil.UniqueSet, typeutil.Set[string]) {
segmentMap := typeutil.NewUniqueSet()
filesMap := typeutil.NewSet[string]()
segments := gc.meta.GetAllSegmentsUnsafe()
for _, segment := range segments {
cloned := segment.Clone()
binlog.DecompressBinLogs(cloned.SegmentInfo)
segmentMap.Insert(segment.GetID())
for _, log := range getLogs(cloned) {
filesMap.Insert(log.GetLogPath())
}
}
return segmentMap, filesMap
type scanTask struct {
prefix string
checker func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool
label string
}
scanTasks := []scanTask{
{
prefix: path.Join(gc.option.cli.RootPath(), common.SegmentInsertLogPath),
checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
return segment != nil
},
label: metrics.InsertFileLabel,
},
{
prefix: path.Join(gc.option.cli.RootPath(), common.SegmentStatslogPath),
checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
logID, err := binlog.GetLogIDFromBingLogPath(objectInfo.FilePath)
if err != nil {
log.Warn("garbageCollector find dirty stats log", zap.String("filePath", objectInfo.FilePath), zap.Error(err))
return false
}
return segment != nil && segment.IsStatsLogExists(logID)
},
label: metrics.StatFileLabel,
},
{
prefix: path.Join(gc.option.cli.RootPath(), common.SegmentDeltaLogPath),
checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
logID, err := binlog.GetLogIDFromBingLogPath(objectInfo.FilePath)
if err != nil {
log.Warn("garbageCollector find dirty dleta log", zap.String("filePath", objectInfo.FilePath), zap.Error(err))
return false
}
return segment != nil && segment.IsDeltaLogExists(logID)
},
label: metrics.DeleteFileLabel,
},
}
// walk only data cluster related prefixes
prefixes := make([]string, 0, 3)
prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), common.SegmentInsertLogPath))
prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), common.SegmentStatslogPath))
prefixes = append(prefixes, path.Join(gc.option.cli.RootPath(), common.SegmentDeltaLogPath))
labels := []string{metrics.InsertFileLabel, metrics.StatFileLabel, metrics.DeleteFileLabel}
var removedKeys []string
for idx, prefix := range prefixes {
startTs := time.Now()
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, prefix, true)
if err != nil {
log.Error("failed to list files with prefix",
zap.String("prefix", prefix),
zap.Error(err),
)
}
cost := time.Since(startTs)
segmentMap, filesMap := getMetaMap()
metrics.GarbageCollectorListLatency.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), labels[idx]).
Observe(float64(cost.Milliseconds()))
log.Info("gc scan finish list object", zap.String("prefix", prefix), zap.Duration("time spent", cost), zap.Int("keys", len(infoKeys)))
for i, infoKey := range infoKeys {
total++
_, has := filesMap[infoKey]
if has {
valid++
continue
}
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey)
if err != nil {
missing++
log.Warn("parse segment id error",
zap.String("infoKey", infoKey),
zap.Error(err))
continue
}
if strings.Contains(prefix, common.SegmentInsertLogPath) &&
segmentMap.Contain(segmentID) {
valid++
continue
}
// not found in meta, check last modified time exceeds tolerance duration
if time.Since(modTimes[i]) > gc.option.missingTolerance {
// ignore error since it could be cleaned up next time
removedKeys = append(removedKeys, infoKey)
err = gc.option.cli.Remove(ctx, infoKey)
if err != nil {
missing++
log.Error("failed to remove object",
zap.String("infoKey", infoKey),
zap.Error(err))
}
}
}
for _, task := range scanTasks {
gc.recycleUnusedBinLogWithChecker(ctx, task.prefix, task.label, task.checker)
}
metrics.GarbageCollectorRunCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(1)
log.Info("scan file to do garbage collection",
}
// recycleUnusedBinLogWithChecker scans the prefix and checks the path with checker.
// GC the file if checker returns false.
func (gc *garbageCollector) recycleUnusedBinLogWithChecker(ctx context.Context, prefix string, label string, checker func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool) {
logger := log.With(zap.String("prefix", prefix))
logger.Info("garbageCollector recycleUnusedBinlogFiles start", zap.String("prefix", prefix))
lastFilePath := ""
total := 0
valid := 0
unexpectedFailure := atomic.NewInt32(0)
removed := atomic.NewInt32(0)
start := time.Now()
futures := make([]*conc.Future[struct{}], 0)
err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(chunkInfo *storage.ChunkObjectInfo) bool {
total++
lastFilePath = chunkInfo.FilePath
// Check file tolerance first to avoid unnecessary operation.
if time.Since(chunkInfo.ModifyTime) <= gc.option.missingTolerance {
logger.Info("garbageCollector recycleUnusedBinlogFiles skip file since it is not expired", zap.String("filePath", chunkInfo.FilePath), zap.Time("modifyTime", chunkInfo.ModifyTime))
return true
}
// Parse segmentID from file path.
// TODO: Does all files in the same segment have the same segmentID?
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), chunkInfo.FilePath)
if err != nil {
unexpectedFailure.Inc()
logger.Warn("garbageCollector recycleUnusedBinlogFiles parse segment id error",
zap.String("filePath", chunkInfo.FilePath),
zap.Error(err))
return true
}
segment := gc.meta.GetSegment(segmentID)
if checker(chunkInfo, segment) {
valid++
logger.Info("garbageCollector recycleUnusedBinlogFiles skip file since it is valid", zap.String("filePath", chunkInfo.FilePath), zap.Int64("segmentID", segmentID))
return true
}
// ignore error since it could be cleaned up next time
file := chunkInfo.FilePath
future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
logger := logger.With(zap.String("file", file))
logger.Info("garbageCollector recycleUnusedBinlogFiles remove file...")
if err = gc.option.cli.Remove(ctx, file); err != nil {
log.Warn("garbageCollector recycleUnusedBinlogFiles remove file failed", zap.Error(err))
unexpectedFailure.Inc()
return struct{}{}, err
}
log.Info("garbageCollector recycleUnusedBinlogFiles remove file success")
removed.Inc()
return struct{}{}, nil
})
futures = append(futures, future)
return true
})
// Wait for all remove tasks done.
if err := conc.BlockOnAll(futures...); err != nil {
// error is logged, and can be ignored here.
logger.Warn("some task failure in remove object pool", zap.Error(err))
}
cost := time.Since(start)
logger.Info("garbageCollector recycleUnusedBinlogFiles done",
zap.Int("total", total),
zap.Int("valid", valid),
zap.Int("missing", missing),
zap.Strings("removedKeys", removedKeys))
zap.Int("unexpectedFailure", int(unexpectedFailure.Load())),
zap.Int("removed", int(removed.Load())),
zap.String("lastFilePath", lastFilePath),
zap.Duration("cost", cost),
zap.Error(err))
metrics.GarbageCollectorFileScanDuration.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), label).
Observe(float64(cost.Milliseconds()))
}
func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
@ -330,7 +404,13 @@ func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
return true
}
func (gc *garbageCollector) clearEtcd() {
// recycleDroppedSegments scans all segments and remove those dropped segments from meta and oss.
func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleDroppedSegments"), zap.Time("startAt", start))
log.Info("start clear dropped segments...")
defer func() { log.Info("clear dropped segments done", zap.Duration("timeCost", time.Since(start))) }()
all := gc.meta.SelectSegments(func(si *SegmentInfo) bool { return true })
drops := make(map[int64]*SegmentInfo, 0)
compactTo := make(map[int64]*SegmentInfo)
@ -367,42 +447,40 @@ func (gc *garbageCollector) clearEtcd() {
channelCPs[channel] = pos.GetTimestamp()
}
dropIDs := lo.Keys(drops)
sort.Slice(dropIDs, func(i, j int) bool {
return dropIDs[i] < dropIDs[j]
})
log.Info("start to GC segments", zap.Int("drop_num", len(dropIDs)))
for _, segmentID := range dropIDs {
segment, ok := drops[segmentID]
if !ok {
log.Warn("segmentID is not in drops", zap.Int64("segmentID", segmentID))
continue
log.Info("start to GC segments", zap.Int("drop_num", len(drops)))
for segmentID, segment := range drops {
if ctx.Err() != nil {
// process canceled, stop.
return
}
log := log.With(zap.Int64("segmentID", segmentID))
segInsertChannel := segment.GetInsertChannel()
if !gc.checkDroppedSegmentGC(segment, compactTo[segment.GetID()], indexedSet, channelCPs[segInsertChannel]) {
continue
}
logs := getLogs(segment)
log.Info("GC segment", zap.Int64("segmentID", segment.GetID()),
zap.Int("insert_logs", len(segment.GetBinlogs())),
log.Info("GC segment start...", zap.Int("insert_logs", len(segment.GetBinlogs())),
zap.Int("delta_logs", len(segment.GetDeltalogs())),
zap.Int("stats_logs", len(segment.GetStatslogs())))
if gc.removeLogs(logs) {
err := gc.meta.DropSegment(segment.GetID())
if err != nil {
log.Info("GC segment meta failed to drop segment", zap.Int64("segment id", segment.GetID()), zap.Error(err))
} else {
log.Info("GC segment meta drop semgent", zap.Int64("segment id", segment.GetID()))
}
if err := gc.removeObjectFiles(ctx, logs); err != nil {
log.Warn("GC segment remove logs failed", zap.Error(err))
continue
}
if err := gc.meta.DropSegment(segment.GetID()); err != nil {
log.Warn("GC segment meta failed to drop segment", zap.Error(err))
continue
}
log.Info("GC segment meta drop segment done")
if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 &&
!gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) {
log.Info("empty channel found during gc, manually cleanup channel checkpoints", zap.String("vChannel", segInsertChannel))
// TODO: remove channel checkpoint may be lost, need to be handled before segment GC?
if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil {
log.Info("failed to drop channel check point during segment garbage collection", zap.String("vchannel", segInsertChannel), zap.Error(err))
log.Warn("failed to drop channel check point during segment garbage collection", zap.String("vchannel", segInsertChannel), zap.Error(err))
}
}
}
@ -413,156 +491,210 @@ func (gc *garbageCollector) isExpire(dropts Timestamp) bool {
return time.Since(droptime) > gc.option.dropTolerance
}
func getLogs(sinfo *SegmentInfo) []*datapb.Binlog {
var logs []*datapb.Binlog
func getLogs(sinfo *SegmentInfo) map[string]struct{} {
logs := make(map[string]struct{})
for _, flog := range sinfo.GetBinlogs() {
logs = append(logs, flog.GetBinlogs()...)
for _, l := range flog.GetBinlogs() {
logs[l.GetLogPath()] = struct{}{}
}
}
for _, flog := range sinfo.GetStatslogs() {
logs = append(logs, flog.GetBinlogs()...)
for _, l := range flog.GetBinlogs() {
logs[l.GetLogPath()] = struct{}{}
}
}
for _, flog := range sinfo.GetDeltalogs() {
logs = append(logs, flog.GetBinlogs()...)
for _, l := range flog.GetBinlogs() {
logs[l.GetLogPath()] = struct{}{}
}
}
return logs
}
func (gc *garbageCollector) removeLogs(logs []*datapb.Binlog) bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var w sync.WaitGroup
w.Add(len(logs))
for _, l := range logs {
tmpLog := l
gc.option.removeLogPool.Submit(func() (struct{}, error) {
defer w.Done()
select {
case <-ctx.Done():
return struct{}{}, nil
default:
err := gc.option.cli.Remove(ctx, tmpLog.GetLogPath())
if err != nil {
switch err.(type) {
case minio.ErrorResponse:
errResp := minio.ToErrorResponse(err)
if errResp.Code != "" && errResp.Code != "NoSuchKey" {
cancel()
}
default:
cancel()
}
// removeObjectFiles remove file from oss storage, return error if any log failed to remove.
func (gc *garbageCollector) removeObjectFiles(ctx context.Context, filePaths map[string]struct{}) error {
futures := make([]*conc.Future[struct{}], 0)
for filePath := range filePaths {
filePath := filePath
future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
err := gc.option.cli.Remove(ctx, filePath)
// ignore the error Key Not Found
if err != nil {
if !errors.Is(err, merr.ErrIoKeyNotFound) {
return struct{}{}, err
}
return struct{}{}, nil
log.Info("remove log failed, key not found, may be removed at previous GC, ignore the error",
zap.String("path", filePath),
zap.Error(err))
}
return struct{}{}, nil
})
futures = append(futures, future)
}
w.Wait()
select {
case <-ctx.Done():
return false
default:
return true
}
return conc.BlockOnAll(futures...)
}
func (gc *garbageCollector) recycleUnusedIndexes() {
log.Info("start recycleUnusedIndexes")
// recycleUnusedIndexes is used to delete those indexes that is deleted by collection.
func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleUnusedIndexes"), zap.Time("startAt", start))
log.Info("start recycleUnusedIndexes...")
defer func() { log.Info("recycleUnusedIndexes done", zap.Duration("timeCost", time.Since(start))) }()
deletedIndexes := gc.meta.indexMeta.GetDeletedIndexes()
for _, index := range deletedIndexes {
if ctx.Err() != nil {
// process canceled.
return
}
log := log.With(zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID))
if err := gc.meta.indexMeta.RemoveIndex(index.CollectionID, index.IndexID); err != nil {
log.Warn("remove index on collection fail", zap.Int64("collectionID", index.CollectionID),
zap.Int64("indexID", index.IndexID), zap.Error(err))
log.Warn("remove index on collection fail", zap.Error(err))
continue
}
log.Info("remove index on collection done")
}
}
func (gc *garbageCollector) recycleUnusedSegIndexes() {
// recycleUnusedSegIndexes remove the index of segment if index is deleted or segment itself is deleted.
func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleUnusedSegIndexes"), zap.Time("startAt", start))
log.Info("start recycleUnusedSegIndexes...")
defer func() { log.Info("recycleUnusedSegIndexes done", zap.Duration("timeCost", time.Since(start))) }()
segIndexes := gc.meta.indexMeta.GetAllSegIndexes()
for _, segIdx := range segIndexes {
if ctx.Err() != nil {
// process canceled.
return
}
// 1. segment belongs to is deleted.
// 2. index is deleted.
if gc.meta.GetSegment(segIdx.SegmentID) == nil || !gc.meta.indexMeta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
if err := gc.meta.indexMeta.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("segmentID", segIdx.SegmentID), zap.Int64("nodeID", segIdx.NodeID), zap.Error(err))
indexFiles := gc.getAllIndexFilesOfIndex(segIdx)
log := log.With(zap.Int64("collectionID", segIdx.CollectionID),
zap.Int64("partitionID", segIdx.PartitionID),
zap.Int64("segmentID", segIdx.SegmentID),
zap.Int64("indexID", segIdx.IndexID),
zap.Int64("buildID", segIdx.BuildID),
zap.Int64("nodeID", segIdx.NodeID),
zap.Int("indexFiles", len(indexFiles)))
log.Info("GC Segment Index file start...")
// Remove index files first.
if err := gc.removeObjectFiles(ctx, indexFiles); err != nil {
log.Warn("fail to remove index files for index", zap.Error(err))
continue
}
log.Info("index meta recycle success", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("segmentID", segIdx.SegmentID))
// Remove meta from index meta.
if err := gc.meta.indexMeta.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
log.Warn("delete index meta from etcd failed, wait to retry", zap.Error(err))
continue
}
log.Info("index meta recycle success")
}
}
}
// recycleUnusedIndexFiles is used to delete those index files that no longer exist in the meta.
func (gc *garbageCollector) recycleUnusedIndexFiles() {
log.Info("start recycleUnusedIndexFiles")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startTs := time.Now()
func (gc *garbageCollector) recycleUnusedIndexFiles(ctx context.Context) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleUnusedIndexFiles"), zap.Time("startAt", start))
log.Info("start recycleUnusedIndexFiles...")
prefix := path.Join(gc.option.cli.RootPath(), common.SegmentIndexPath) + "/"
// list dir first
keys, _, err := gc.option.cli.ListWithPrefix(ctx, prefix, false)
if err != nil {
log.Warn("garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err))
return
}
log.Info("recycleUnusedIndexFiles, finish list object", zap.Duration("time spent", time.Since(startTs)), zap.Int("build ids", len(keys)))
for _, key := range keys {
log.Debug("indexFiles keys", zap.String("key", key))
keyCount := 0
err := gc.option.cli.WalkWithPrefix(ctx, prefix, false, func(indexPathInfo *storage.ChunkObjectInfo) bool {
key := indexPathInfo.FilePath
keyCount++
logger := log.With(zap.String("prefix", prefix), zap.String("key", key))
buildID, err := parseBuildIDFromFilePath(key)
if err != nil {
log.Warn("garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err))
continue
logger.Warn("garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.Error(err))
return true
}
log.Info("garbageCollector will recycle index files", zap.Int64("buildID", buildID))
logger = logger.With(zap.Int64("buildID", buildID))
logger.Info("garbageCollector will recycle index files")
canRecycle, segIdx := gc.meta.indexMeta.CleanSegmentIndex(buildID)
if !canRecycle {
// Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc,
// and delete all index files about the buildID at one time.
log.Info("garbageCollector can not recycle index files", zap.Int64("buildID", buildID))
continue
logger.Info("garbageCollector can not recycle index files")
return true
}
if segIdx == nil {
// buildID no longer exists in meta, remove all index files
log.Info("garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files",
zap.Int64("buildID", buildID))
logger.Info("garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files")
err = gc.option.cli.RemoveWithPrefix(ctx, key)
if err != nil {
log.Warn("garbageCollector recycleUnusedIndexFiles remove index files failed",
zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err))
continue
logger.Warn("garbageCollector recycleUnusedIndexFiles remove index files failed", zap.Error(err))
return true
}
log.Info("garbageCollector recycleUnusedIndexFiles remove index files success",
zap.Int64("buildID", buildID), zap.String("prefix", key))
continue
logger.Info("garbageCollector recycleUnusedIndexFiles remove index files success")
return true
}
filesMap := make(map[string]struct{})
for _, fileID := range segIdx.IndexFileKeys {
filepath := metautil.BuildSegmentIndexFilePath(gc.option.cli.RootPath(), segIdx.BuildID, segIdx.IndexVersion,
segIdx.PartitionID, segIdx.SegmentID, fileID)
filesMap[filepath] = struct{}{}
}
files, _, err := gc.option.cli.ListWithPrefix(ctx, key, true)
if err != nil {
log.Warn("garbageCollector recycleUnusedIndexFiles list files failed",
zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err))
continue
}
log.Info("recycle index files", zap.Int64("buildID", buildID), zap.Int("meta files num", len(filesMap)),
zap.Int("chunkManager files num", len(files)))
deletedFilesNum := 0
for _, file := range files {
filesMap := gc.getAllIndexFilesOfIndex(segIdx)
logger.Info("recycle index files", zap.Int("meta files num", len(filesMap)))
deletedFilesNum := atomic.NewInt32(0)
fileNum := 0
futures := make([]*conc.Future[struct{}], 0)
err = gc.option.cli.WalkWithPrefix(ctx, key, true, func(indexFile *storage.ChunkObjectInfo) bool {
fileNum++
file := indexFile.FilePath
if _, ok := filesMap[file]; !ok {
if err = gc.option.cli.Remove(ctx, file); err != nil {
log.Warn("garbageCollector recycleUnusedIndexFiles remove file failed",
zap.Int64("buildID", buildID), zap.String("file", file), zap.Error(err))
continue
}
deletedFilesNum++
future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
logger := logger.With(zap.String("file", file))
logger.Info("garbageCollector recycleUnusedIndexFiles remove file...")
if err := gc.option.cli.Remove(ctx, file); err != nil {
logger.Warn("garbageCollector recycleUnusedIndexFiles remove file failed", zap.Error(err))
return struct{}{}, err
}
deletedFilesNum.Inc()
logger.Info("garbageCollector recycleUnusedIndexFiles remove file success")
return struct{}{}, nil
})
futures = append(futures, future)
}
return true
})
// Wait for all remove tasks done.
if err := conc.BlockOnAll(futures...); err != nil {
// error is logged, and can be ignored here.
logger.Warn("some task failure in remove object pool", zap.Error(err))
}
log.Info("index files recycle success", zap.Int64("buildID", buildID),
zap.Int("delete index files num", deletedFilesNum))
logger = logger.With(zap.Int("deleteIndexFilesNum", int(deletedFilesNum.Load())), zap.Int("walkFileNum", fileNum))
if err != nil {
logger.Warn("index files recycle failed when walk with prefix", zap.Error(err))
return true
}
logger.Info("index files recycle done")
return true
})
log = log.With(zap.Duration("timeCost", time.Since(start)), zap.Int("keyCount", keyCount), zap.Error(err))
if err != nil {
log.Warn("garbageCollector recycleUnusedIndexFiles failed", zap.Error(err))
return
}
log.Info("recycleUnusedIndexFiles done")
}
// getAllIndexFilesOfIndex returns the all index files of index.
func (gc *garbageCollector) getAllIndexFilesOfIndex(segmentIndex *model.SegmentIndex) map[string]struct{} {
filesMap := make(map[string]struct{})
for _, fileID := range segmentIndex.IndexFileKeys {
filepath := metautil.BuildSegmentIndexFilePath(gc.option.cli.RootPath(), segmentIndex.BuildID, segmentIndex.IndexVersion,
segmentIndex.PartitionID, segmentIndex.SegmentID, fileID)
filesMap[filepath] = struct{}{}
}
return filesMap
}

View File

@ -51,6 +51,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -100,7 +101,8 @@ func Test_garbageCollector_basic(t *testing.T) {
})
}
func validateMinioPrefixElements(t *testing.T, cli *minio.Client, bucketName string, prefix string, elements []string) {
func validateMinioPrefixElements(t *testing.T, manager *storage.RemoteChunkManager, bucketName string, prefix string, elements []string) {
cli := manager.UnderlyingObjectStorage().(*storage.MinioObjectStorage).Client
var current []string
for info := range cli.ListObjects(context.TODO(), bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
current = append(current, info.Key)
@ -127,12 +129,12 @@ func Test_garbageCollector_scan(t *testing.T) {
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
})
gc.scan()
gc.recycleUnusedBinlogFiles(context.TODO())
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
@ -145,12 +147,12 @@ func Test_garbageCollector_scan(t *testing.T) {
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
})
gc.scan()
gc.recycleUnusedBinlogFiles(context.TODO())
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
@ -172,11 +174,11 @@ func Test_garbageCollector_scan(t *testing.T) {
dropTolerance: time.Hour * 24,
})
gc.start()
gc.scan()
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
gc.recycleUnusedBinlogFiles(context.TODO())
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
@ -200,11 +202,11 @@ func Test_garbageCollector_scan(t *testing.T) {
missingTolerance: time.Hour * 24,
dropTolerance: 0,
})
gc.clearEtcd()
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
gc.recycleDroppedSegments(context.TODO())
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
@ -218,14 +220,14 @@ func Test_garbageCollector_scan(t *testing.T) {
dropTolerance: 0,
})
gc.start()
gc.scan()
gc.clearEtcd()
gc.recycleUnusedBinlogFiles(context.TODO())
gc.recycleDroppedSegments(context.TODO())
// bad path shall remains since datacoord cannot determine file is garbage or not if path is not valid
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:2])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats[1:2])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta[1:2])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
@ -240,22 +242,22 @@ func Test_garbageCollector_scan(t *testing.T) {
dropTolerance: 0,
})
gc.start()
gc.scan()
gc.recycleUnusedBinlogFiles(context.TODO())
// bad path shall remains since datacoord cannot determine file is garbage or not if path is not valid
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentInsertLogPath), inserts[1:2])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentStatslogPath), stats[1:2])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, common.SegmentDeltaLogPath), delta[1:2])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
gc.close()
})
cleanupOSS(cli.Client, bucketName, rootPath)
cleanupOSS(cli, bucketName, rootPath)
}
// initialize unit test sso env
func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, inserts []string, stats []string, delta []string, other []string, err error) {
func initUtOSSEnv(bucket, root string, n int) (mcm *storage.RemoteChunkManager, inserts []string, stats []string, delta []string, other []string, err error) {
paramtable.Init()
if Params.MinioCfg.UseSSL.GetAsBool() && len(Params.MinioCfg.SslCACert.GetValue()) > 0 {
@ -335,14 +337,16 @@ func initUtOSSEnv(bucket, root string, n int) (mcm *storage.MinioChunkManager, i
}
other = append(other, info.Key)
}
mcm = &storage.MinioChunkManager{
Client: cli,
}
mcm.SetVar(bucket, root)
mcm = storage.NewRemoteChunkManagerForTesting(
cli,
bucket,
root,
)
return mcm, inserts, stats, delta, other, nil
}
func cleanupOSS(cli *minio.Client, bucket, root string) {
func cleanupOSS(chunkManager *storage.RemoteChunkManager, bucket, root string) {
cli := chunkManager.UnderlyingObjectStorage().(*storage.MinioObjectStorage).Client
ch := cli.ListObjects(context.TODO(), bucket, minio.ListObjectsOptions{Prefix: root, Recursive: true})
cli.RemoveObjects(context.TODO(), bucket, ch, minio.RemoveObjectsOptions{})
cli.RemoveBucket(context.TODO(), bucket)
@ -425,7 +429,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) {
mock.Anything,
).Return(nil)
gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{})
gc.recycleUnusedIndexes()
gc.recycleUnusedIndexes(context.TODO())
})
t.Run("fail", func(t *testing.T) {
@ -436,7 +440,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) {
mock.Anything,
).Return(errors.New("fail"))
gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{})
gc.recycleUnusedIndexes()
gc.recycleUnusedIndexes(context.TODO())
})
}
@ -558,6 +562,9 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) {
t.Run("success", func(t *testing.T) {
mockChunkManager := mocks.NewChunkManager(t)
mockChunkManager.EXPECT().RootPath().Return("root")
mockChunkManager.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
catalog := catalogmocks.NewDataCoordCatalog(t)
catalog.On("DropSegmentIndex",
mock.Anything,
@ -566,12 +573,17 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(nil)
gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{})
gc.recycleUnusedSegIndexes()
gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{
cli: mockChunkManager,
})
gc.recycleUnusedSegIndexes(context.TODO())
})
t.Run("fail", func(t *testing.T) {
catalog := catalogmocks.NewDataCoordCatalog(t)
mockChunkManager := mocks.NewChunkManager(t)
mockChunkManager.EXPECT().RootPath().Return("root")
mockChunkManager.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
catalog.On("DropSegmentIndex",
mock.Anything,
mock.Anything,
@ -579,8 +591,10 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{})
gc.recycleUnusedSegIndexes()
gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{
cli: mockChunkManager,
})
gc.recycleUnusedSegIndexes(context.TODO())
})
}
@ -726,7 +740,14 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
t.Run("success", func(t *testing.T) {
cm := &mocks.ChunkManager{}
cm.EXPECT().RootPath().Return("root")
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil)
cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, s string, b bool, cowf storage.ChunkObjectWalkFunc) error {
for _, file := range []string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"} {
cowf(&storage.ChunkObjectInfo{FilePath: file})
}
return nil
})
cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(nil)
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
gc := newGarbageCollector(
@ -736,27 +757,36 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
cli: cm,
})
gc.recycleUnusedIndexFiles()
gc.recycleUnusedIndexFiles(context.TODO())
})
t.Run("list fail", func(t *testing.T) {
cm := &mocks.ChunkManager{}
cm.EXPECT().RootPath().Return("root")
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, errors.New("error"))
cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, s string, b bool, cowf storage.ChunkObjectWalkFunc) error {
return errors.New("error")
})
gc := newGarbageCollector(
createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
nil,
GcOption{
cli: cm,
})
gc.recycleUnusedIndexFiles()
gc.recycleUnusedIndexFiles(context.TODO())
})
t.Run("remove fail", func(t *testing.T) {
cm := &mocks.ChunkManager{}
cm.EXPECT().RootPath().Return("root")
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("error"))
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil)
cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, s string, b bool, cowf storage.ChunkObjectWalkFunc) error {
for _, file := range []string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"} {
cowf(&storage.ChunkObjectInfo{FilePath: file})
}
return nil
})
cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(nil)
gc := newGarbageCollector(
createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
@ -764,14 +794,20 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
GcOption{
cli: cm,
})
gc.recycleUnusedIndexFiles()
gc.recycleUnusedIndexFiles(context.TODO())
})
t.Run("remove with prefix fail", func(t *testing.T) {
cm := &mocks.ChunkManager{}
cm.EXPECT().RootPath().Return("root")
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("error"))
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil)
cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, s string, b bool, cowf storage.ChunkObjectWalkFunc) error {
for _, file := range []string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"} {
cowf(&storage.ChunkObjectInfo{FilePath: file})
}
return nil
})
cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("error"))
gc := newGarbageCollector(
createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
@ -779,7 +815,7 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
GcOption{
cli: cm,
})
gc.recycleUnusedIndexFiles()
gc.recycleUnusedIndexFiles(context.TODO())
})
}
@ -1320,7 +1356,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
cli: cm,
dropTolerance: 1,
})
gc.clearEtcd()
gc.recycleDroppedSegments(context.TODO())
/*
A B
@ -1376,7 +1412,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
})
assert.NoError(t, err)
gc.clearEtcd()
gc.recycleDroppedSegments(context.TODO())
/*
A: processed prior to C, C is not GCed yet and C is not indexed, A is not GCed in this turn
@ -1392,7 +1428,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
segD = gc.meta.GetSegment(segID + 3)
assert.Nil(t, segD)
gc.clearEtcd()
gc.recycleDroppedSegments(context.TODO())
/*
A: compacted became false due to C is GCed already, A should be GCed since dropTolernace is meet
B: compacted became false due to C is GCed already, B should be GCed since dropTolerance is meet
@ -1403,9 +1439,9 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
assert.Nil(t, segB)
}
func TestGarbageCollector_removelogs(t *testing.T) {
func TestGarbageCollector_removeObjectPool(t *testing.T) {
paramtable.Init()
cm := &mocks.ChunkManager{}
cm := mocks.NewChunkManager(t)
gc := newGarbageCollector(
nil,
nil,
@ -1413,43 +1449,37 @@ func TestGarbageCollector_removelogs(t *testing.T) {
cli: cm,
dropTolerance: 1,
})
var logs []*datapb.Binlog
logs := make(map[string]struct{})
for i := 0; i < 50; i++ {
logs = append(logs, &datapb.Binlog{
LogPath: "log" + strconv.Itoa(i),
})
logs[fmt.Sprintf("log%d", i)] = struct{}{}
}
t.Run("success", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
defer call.Unset()
b := gc.removeLogs(logs)
assert.True(t, b)
b := gc.removeObjectFiles(context.TODO(), logs)
assert.NoError(t, b)
})
t.Run("minio not found error", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(minio.ErrorResponse{
Code: "NoSuchKey",
})
t.Run("oss not found error", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(merr.WrapErrIoKeyNotFound("not found"))
defer call.Unset()
b := gc.removeLogs(logs)
assert.True(t, b)
b := gc.removeObjectFiles(context.TODO(), logs)
assert.NoError(t, b)
})
t.Run("minio server error", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(minio.ErrorResponse{
Code: "Server Error",
})
t.Run("oss server error", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(merr.WrapErrIoFailed("server error", errors.New("err")))
defer call.Unset()
b := gc.removeLogs(logs)
assert.False(t, b)
b := gc.removeObjectFiles(context.TODO(), logs)
assert.Error(t, b)
})
t.Run("other type error", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("other error"))
defer call.Unset()
b := gc.removeLogs(logs)
assert.False(t, b)
b := gc.removeObjectFiles(context.TODO(), logs)
assert.Error(t, b)
})
}
@ -1459,7 +1489,7 @@ type GarbageCollectorSuite struct {
bucketName string
rootPath string
cli *storage.MinioChunkManager
cli *storage.RemoteChunkManager
inserts []string
stats []string
delta []string
@ -1481,7 +1511,7 @@ func (s *GarbageCollectorSuite) SetupTest() {
}
func (s *GarbageCollectorSuite) TearDownTest() {
cleanupOSS(s.cli.Client, s.bucketName, s.rootPath)
cleanupOSS(s.cli, s.bucketName, s.rootPath)
}
func (s *GarbageCollectorSuite) TestPauseResume() {

View File

@ -438,7 +438,7 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager,
}
insertPrefix := importFile.GetPaths()[0]
segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false)
segmentInsertPaths, _, err := storage.ListAllChunkWithPrefix(ctx, cm, insertPrefix, false)
if err != nil {
return nil, err
}
@ -450,7 +450,7 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager,
return segmentImportFiles, nil
}
deltaPrefix := importFile.GetPaths()[1]
segmentDeltaPaths, _, err := cm.ListWithPrefix(context.Background(), deltaPrefix, false)
segmentDeltaPaths, _, err := storage.ListAllChunkWithPrefix(ctx, cm, deltaPrefix, false)
if err != nil {
return nil, err
}

View File

@ -34,6 +34,7 @@ import (
mocks2 "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -336,8 +337,24 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) {
}
cm := mocks2.NewChunkManager(t)
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil)
cm.EXPECT().WalkWithPrefix(mock.Anything, insertPrefix, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, s string, b bool, cowf storage.ChunkObjectWalkFunc) error {
for _, p := range segmentInsertPaths {
if !cowf(&storage.ChunkObjectInfo{FilePath: p}) {
return nil
}
}
return nil
})
cm.EXPECT().WalkWithPrefix(mock.Anything, deltaPrefix, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, s string, b bool, cowf storage.ChunkObjectWalkFunc) error {
for _, p := range segmentDeltaPaths {
if !cowf(&storage.ChunkObjectInfo{FilePath: p}) {
return nil
}
}
return nil
})
file := &internalpb.ImportFile{
Id: 1,

View File

@ -216,6 +216,28 @@ func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) {
}
}
func (s *SegmentInfo) IsDeltaLogExists(logID int64) bool {
for _, deltaLogs := range s.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
if l.GetLogID() == logID {
return true
}
}
}
return false
}
func (s *SegmentInfo) IsStatsLogExists(logID int64) bool {
for _, statsLogs := range s.GetStatslogs() {
for _, l := range statsLogs.GetBinlogs() {
if l.GetLogID() == logID {
return true
}
}
}
return false
}
// Clone deep clone the segment info and return a new instance
func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo)

View File

@ -96,3 +96,49 @@ func TestCompactionTo(t *testing.T) {
assert.True(t, ok)
assert.Nil(t, s)
}
func TestIsDeltaLogExists(t *testing.T) {
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
LogID: 1,
},
{
LogID: 2,
},
},
},
},
},
}
assert.True(t, segment.IsDeltaLogExists(1))
assert.True(t, segment.IsDeltaLogExists(2))
assert.False(t, segment.IsDeltaLogExists(3))
assert.False(t, segment.IsDeltaLogExists(0))
}
func TestIsStatsLogExists(t *testing.T) {
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
Statslogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
LogID: 1,
},
{
LogID: 2,
},
},
},
},
},
}
assert.True(t, segment.IsStatsLogExists(1))
assert.True(t, segment.IsStatsLogExists(2))
assert.False(t, segment.IsStatsLogExists(3))
assert.False(t, segment.IsStatsLogExists(0))
}

View File

@ -1568,7 +1568,7 @@ func TestImportV2(t *testing.T) {
// list binlog failed
cm := mocks2.NewChunkManager(t)
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, mockErr)
cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
s.meta = &meta{chunkManager: cm}
resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{
Files: []*internalpb.ImportFile{

View File

@ -136,14 +136,8 @@ func (c *mockChunkmgr) MultiRead(ctx context.Context, filePaths []string) ([][]b
return nil, errNotImplErr
}
func (c *mockChunkmgr) ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error) {
// TODO
return nil, nil, errNotImplErr
}
func (c *mockChunkmgr) ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error) {
// TODO
return nil, nil, errNotImplErr
func (c *mockChunkmgr) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc storage.ChunkObjectWalkFunc) error {
return errNotImplErr
}
func (c *mockChunkmgr) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {

View File

@ -84,14 +84,7 @@ func CompressFieldBinlogs(fieldBinlogs []*datapb.FieldBinlog) error {
for _, binlog := range fieldBinlog.Binlogs {
logPath := binlog.GetLogPath()
if len(logPath) != 0 {
var logID int64
idx := strings.LastIndex(logPath, "/")
if idx == -1 {
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("invalid binlog path: %s", logPath))
}
var err error
logPathStr := logPath[(idx + 1):]
logID, err = strconv.ParseInt(logPathStr, 10, 64)
logID, err := GetLogIDFromBingLogPath(logPath)
if err != nil {
return err
}
@ -184,3 +177,19 @@ func buildLogPath(binlogType storage.BinlogType, collectionID, partitionID, segm
// should not happen
return "", merr.WrapErrParameterInvalidMsg("invalid binlog type")
}
// GetLogIDFromBingLogPath get log id from binlog path
func GetLogIDFromBingLogPath(logPath string) (int64, error) {
var logID int64
idx := strings.LastIndex(logPath, "/")
if idx == -1 {
return 0, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("invalid binlog path: %s", logPath))
}
var err error
logPathStr := logPath[(idx + 1):]
logID, err = strconv.ParseInt(logPathStr, 10, 64)
if err != nil {
return 0, err
}
return logID, nil
}

View File

@ -10,8 +10,6 @@ import (
mock "github.com/stretchr/testify/mock"
storage "github.com/milvus-io/milvus/internal/storage"
time "time"
)
// ChunkManager is an autogenerated mock type for the ChunkManager type
@ -80,71 +78,6 @@ func (_c *ChunkManager_Exist_Call) RunAndReturn(run func(context.Context, string
return _c
}
// ListWithPrefix provides a mock function with given fields: ctx, prefix, recursive
func (_m *ChunkManager) ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error) {
ret := _m.Called(ctx, prefix, recursive)
var r0 []string
var r1 []time.Time
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, string, bool) ([]string, []time.Time, error)); ok {
return rf(ctx, prefix, recursive)
}
if rf, ok := ret.Get(0).(func(context.Context, string, bool) []string); ok {
r0 = rf(ctx, prefix, recursive)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
if rf, ok := ret.Get(1).(func(context.Context, string, bool) []time.Time); ok {
r1 = rf(ctx, prefix, recursive)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]time.Time)
}
}
if rf, ok := ret.Get(2).(func(context.Context, string, bool) error); ok {
r2 = rf(ctx, prefix, recursive)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// ChunkManager_ListWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListWithPrefix'
type ChunkManager_ListWithPrefix_Call struct {
*mock.Call
}
// ListWithPrefix is a helper method to define mock.On call
// - ctx context.Context
// - prefix string
// - recursive bool
func (_e *ChunkManager_Expecter) ListWithPrefix(ctx interface{}, prefix interface{}, recursive interface{}) *ChunkManager_ListWithPrefix_Call {
return &ChunkManager_ListWithPrefix_Call{Call: _e.mock.On("ListWithPrefix", ctx, prefix, recursive)}
}
func (_c *ChunkManager_ListWithPrefix_Call) Run(run func(ctx context.Context, prefix string, recursive bool)) *ChunkManager_ListWithPrefix_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(bool))
})
return _c
}
func (_c *ChunkManager_ListWithPrefix_Call) Return(_a0 []string, _a1 []time.Time, _a2 error) *ChunkManager_ListWithPrefix_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *ChunkManager_ListWithPrefix_Call) RunAndReturn(run func(context.Context, string, bool) ([]string, []time.Time, error)) *ChunkManager_ListWithPrefix_Call {
_c.Call.Return(run)
return _c
}
// Mmap provides a mock function with given fields: ctx, filePath
func (_m *ChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
ret := _m.Called(ctx, filePath)
@ -506,70 +439,6 @@ func (_c *ChunkManager_ReadAt_Call) RunAndReturn(run func(context.Context, strin
return _c
}
// ReadWithPrefix provides a mock function with given fields: ctx, prefix
func (_m *ChunkManager) ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error) {
ret := _m.Called(ctx, prefix)
var r0 []string
var r1 [][]byte
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, string) ([]string, [][]byte, error)); ok {
return rf(ctx, prefix)
}
if rf, ok := ret.Get(0).(func(context.Context, string) []string); ok {
r0 = rf(ctx, prefix)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
if rf, ok := ret.Get(1).(func(context.Context, string) [][]byte); ok {
r1 = rf(ctx, prefix)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([][]byte)
}
}
if rf, ok := ret.Get(2).(func(context.Context, string) error); ok {
r2 = rf(ctx, prefix)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// ChunkManager_ReadWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadWithPrefix'
type ChunkManager_ReadWithPrefix_Call struct {
*mock.Call
}
// ReadWithPrefix is a helper method to define mock.On call
// - ctx context.Context
// - prefix string
func (_e *ChunkManager_Expecter) ReadWithPrefix(ctx interface{}, prefix interface{}) *ChunkManager_ReadWithPrefix_Call {
return &ChunkManager_ReadWithPrefix_Call{Call: _e.mock.On("ReadWithPrefix", ctx, prefix)}
}
func (_c *ChunkManager_ReadWithPrefix_Call) Run(run func(ctx context.Context, prefix string)) *ChunkManager_ReadWithPrefix_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *ChunkManager_ReadWithPrefix_Call) Return(_a0 []string, _a1 [][]byte, _a2 error) *ChunkManager_ReadWithPrefix_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *ChunkManager_ReadWithPrefix_Call) RunAndReturn(run func(context.Context, string) ([]string, [][]byte, error)) *ChunkManager_ReadWithPrefix_Call {
_c.Call.Return(run)
return _c
}
// Reader provides a mock function with given fields: ctx, filePath
func (_m *ChunkManager) Reader(ctx context.Context, filePath string) (storage.FileReader, error) {
ret := _m.Called(ctx, filePath)
@ -805,6 +674,51 @@ func (_c *ChunkManager_Size_Call) RunAndReturn(run func(context.Context, string)
return _c
}
// WalkWithPrefix provides a mock function with given fields: ctx, prefix, recursive, walkFunc
func (_m *ChunkManager) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc storage.ChunkObjectWalkFunc) error {
ret := _m.Called(ctx, prefix, recursive, walkFunc)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, bool, storage.ChunkObjectWalkFunc) error); ok {
r0 = rf(ctx, prefix, recursive, walkFunc)
} else {
r0 = ret.Error(0)
}
return r0
}
// ChunkManager_WalkWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WalkWithPrefix'
type ChunkManager_WalkWithPrefix_Call struct {
*mock.Call
}
// WalkWithPrefix is a helper method to define mock.On call
// - ctx context.Context
// - prefix string
// - recursive bool
// - walkFunc storage.ChunkObjectWalkFunc
func (_e *ChunkManager_Expecter) WalkWithPrefix(ctx interface{}, prefix interface{}, recursive interface{}, walkFunc interface{}) *ChunkManager_WalkWithPrefix_Call {
return &ChunkManager_WalkWithPrefix_Call{Call: _e.mock.On("WalkWithPrefix", ctx, prefix, recursive, walkFunc)}
}
func (_c *ChunkManager_WalkWithPrefix_Call) Run(run func(ctx context.Context, prefix string, recursive bool, walkFunc storage.ChunkObjectWalkFunc)) *ChunkManager_WalkWithPrefix_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(bool), args[3].(storage.ChunkObjectWalkFunc))
})
return _c
}
func (_c *ChunkManager_WalkWithPrefix_Call) Return(_a0 error) *ChunkManager_WalkWithPrefix_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *ChunkManager_WalkWithPrefix_Call) RunAndReturn(run func(context.Context, string, bool, storage.ChunkObjectWalkFunc) error) *ChunkManager_WalkWithPrefix_Call {
_c.Call.Return(run)
return _c
}
// Write provides a mock function with given fields: ctx, filePath, content
func (_m *ChunkManager) Write(ctx context.Context, filePath string, content []byte) error {
ret := _m.Called(ctx, filePath, content)

View File

@ -793,7 +793,7 @@ func (sd *shardDelegator) maybeReloadPartitionStats(ctx context.Context, partIDs
idPath := metautil.JoinIDPath(colID, partID)
idPath = path.Join(idPath, sd.vchannelName)
statsPathPrefix := path.Join(sd.chunkManager.RootPath(), common.PartitionStatsPath, idPath)
filePaths, _, err := sd.chunkManager.ListWithPrefix(ctx, statsPathPrefix, true)
filePaths, _, err := storage.ListAllChunkWithPrefix(ctx, sd.chunkManager, statsPathPrefix, true)
if err != nil {
log.Error("Skip initializing partition stats for failing to list files with prefix",
zap.String("statsPathPrefix", statsPathPrefix))

View File

@ -197,21 +197,20 @@ func (AzureObjectStorage *AzureObjectStorage) StatObject(ctx context.Context, bu
return *info.ContentLength, nil
}
func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
var objectsKeys []string
var modTimes []time.Time
func (AzureObjectStorage *AzureObjectStorage) WalkWithObjects(ctx context.Context, bucketName string, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) error {
if recursive {
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
})
if pager.More() {
pageResp, err := pager.NextPage(context.Background())
pageResp, err := pager.NextPage(ctx)
if err != nil {
return []string{}, []time.Time{}, checkObjectStorageError(prefix, err)
return err
}
for _, blob := range pageResp.Segment.BlobItems {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, *blob.Properties.LastModified)
if !walkFunc(&ChunkObjectInfo{FilePath: *blob.Name, ModifyTime: *blob.Properties.LastModified}) {
return nil
}
}
}
} else {
@ -219,21 +218,24 @@ func (AzureObjectStorage *AzureObjectStorage) ListObjects(ctx context.Context, b
Prefix: &prefix,
})
if pager.More() {
pageResp, err := pager.NextPage(context.Background())
pageResp, err := pager.NextPage(ctx)
if err != nil {
return []string{}, []time.Time{}, checkObjectStorageError(prefix, err)
return err
}
for _, blob := range pageResp.Segment.BlobItems {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, *blob.Properties.LastModified)
if !walkFunc(&ChunkObjectInfo{FilePath: *blob.Name, ModifyTime: *blob.Properties.LastModified}) {
return nil
}
}
for _, blob := range pageResp.Segment.BlobPrefixes {
objectsKeys = append(objectsKeys, *blob.Name)
modTimes = append(modTimes, time.Now())
if !walkFunc(&ChunkObjectInfo{FilePath: *blob.Name, ModifyTime: time.Now()}) {
return nil
}
}
}
}
return objectsKeys, modTimes, nil
return nil
}
func (AzureObjectStorage *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error {

View File

@ -124,7 +124,7 @@ func TestAzureObjectStorage(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
gotk, _, err := listAllObjectsWithPrefixAtBucket(ctx, testCM, config.bucketName, test.prefix, false)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
@ -177,7 +177,7 @@ func TestAzureObjectStorage(t *testing.T) {
for _, test := range insertWithPrefixTests {
t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) {
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, test.recursive)
gotk, _, err := listAllObjectsWithPrefixAtBucket(ctx, testCM, config.bucketName, test.prefix, test.recursive)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {

View File

@ -49,9 +49,7 @@ func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string
switch engine {
case "local":
return NewLocalChunkManager(RootPath(f.config.rootPath)), nil
case "minio", "opendal":
return newMinioChunkManagerWithConfig(ctx, f.config)
case "remote":
case "remote", "minio", "opendal":
return NewRemoteChunkManager(ctx, f.config)
default:
return nil, errors.New("no chunk manager implemented with engine: " + engine)

View File

@ -133,53 +133,58 @@ func (lcm *LocalChunkManager) MultiRead(ctx context.Context, filePaths []string)
return results, el
}
func (lcm *LocalChunkManager) ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error) {
var filePaths []string
var modTimes []time.Time
func (lcm *LocalChunkManager) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) (err error) {
logger := log.With(zap.String("prefix", prefix), zap.Bool("recursive", recursive))
logger.Info("start walk through objects")
defer func() {
if err != nil {
logger.Warn("failed to walk through objects", zap.Error(err))
return
}
logger.Info("finish walk through objects")
}()
if recursive {
dir := filepath.Dir(prefix)
err := filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error {
return filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error {
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
return err
}
if strings.HasPrefix(filePath, prefix) && !f.IsDir() {
filePaths = append(filePaths, filePath)
modTime, err := lcm.getModTime(filePath)
if err != nil {
return err
}
if !walkFunc(&ChunkObjectInfo{FilePath: filePath, ModifyTime: modTime}) {
return nil
}
}
return nil
})
if err != nil {
return nil, nil, err
}
for _, filePath := range filePaths {
modTime, err2 := lcm.getModTime(filePath)
if err2 != nil {
return filePaths, nil, err2
}
modTimes = append(modTimes, modTime)
}
return filePaths, modTimes, nil
}
globPaths, err := filepath.Glob(prefix + "*")
if err != nil {
return nil, nil, err
return err
}
filePaths = append(filePaths, globPaths...)
for _, filePath := range filePaths {
modTime, err2 := lcm.getModTime(filePath)
if err2 != nil {
return filePaths, nil, err2
for _, filePath := range globPaths {
if ctx.Err() != nil {
return ctx.Err()
}
modTimes = append(modTimes, modTime)
}
return filePaths, modTimes, nil
}
func (lcm *LocalChunkManager) ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error) {
filePaths, _, err := lcm.ListWithPrefix(ctx, prefix, true)
if err != nil {
return nil, nil, err
modTime, err := lcm.getModTime(filePath)
if err != nil {
return err
}
if !walkFunc(&ChunkObjectInfo{FilePath: filePath, ModifyTime: modTime}) {
return nil
}
}
result, err := lcm.MultiRead(ctx, filePaths)
return filePaths, result, err
return nil
}
// ReadAt reads specific position data of local storage if exists.
@ -246,13 +251,17 @@ func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix strin
log.Warn(errMsg)
return merr.WrapErrParameterInvalidMsg(errMsg)
}
filePaths, _, err := lcm.ListWithPrefix(ctx, prefix, true)
if err != nil {
var removeErr error
if err := lcm.WalkWithPrefix(ctx, prefix, true, func(chunkInfo *ChunkObjectInfo) bool {
err := lcm.MultiRemove(ctx, []string{chunkInfo.FilePath})
if err != nil {
removeErr = err
}
return true
}); err != nil {
return err
}
return lcm.MultiRemove(ctx, filePaths)
return removeErr
}
func (lcm *LocalChunkManager) getModTime(filepath string) (time.Time, error) {

View File

@ -110,7 +110,7 @@ func TestLocalCM(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, gotv, err := testCM.ReadWithPrefix(ctx, path.Join(localPath, testLoadRoot, test.prefix))
gotk, gotv, err := readAllChunkWithPrefix(ctx, testCM, path.Join(localPath, testLoadRoot, test.prefix))
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
assert.Equal(t, len(test.expectedValue), len(gotv))
@ -447,7 +447,7 @@ func TestLocalCM(t *testing.T) {
// localPath/testPrefix/a/b
// localPath/testPrefix/a/c
pathPrefix := path.Join(localPath, testPrefix, "a")
dirs, m, err := testCM.ListWithPrefix(ctx, pathPrefix, true)
dirs, m, err := ListAllChunkWithPrefix(ctx, testCM, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(m))
@ -459,7 +459,7 @@ func TestLocalCM(t *testing.T) {
assert.NoError(t, err)
// no file returned
dirs, m, err = testCM.ListWithPrefix(ctx, pathPrefix, true)
dirs, m, err = ListAllChunkWithPrefix(ctx, testCM, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(dirs))
assert.Equal(t, 0, len(m))
@ -499,7 +499,7 @@ func TestLocalCM(t *testing.T) {
// localPath/testPrefix/abd
// localPath/testPrefix/bcd
testPrefix1 := path.Join(localPath, testPrefix)
dirs, mods, err := testCM.ListWithPrefix(ctx, testPrefix1+"/", false)
dirs, mods, err := ListAllChunkWithPrefix(ctx, testCM, testPrefix1+"/", false)
assert.NoError(t, err)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
@ -513,7 +513,7 @@ func TestLocalCM(t *testing.T) {
// localPath/testPrefix/abc/deg
// localPath/testPrefix/abd
// localPath/testPrefix/bcd
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1+"/", true)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix1+"/", true)
assert.NoError(t, err)
assert.Equal(t, 4, len(dirs))
assert.Equal(t, 4, len(mods))
@ -527,7 +527,7 @@ func TestLocalCM(t *testing.T) {
// localPath/testPrefix/abc
// localPath/testPrefix/abd
testPrefix2 := path.Join(localPath, testPrefix, "a")
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, false)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix2, false)
assert.NoError(t, err)
assert.Equal(t, 2, len(dirs))
assert.Equal(t, 2, len(mods))
@ -539,7 +539,7 @@ func TestLocalCM(t *testing.T) {
// localPath/testPrefix/abc/def
// localPath/testPrefix/abc/deg
// localPath/testPrefix/abd
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, true)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix2, true)
assert.NoError(t, err)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
@ -555,7 +555,7 @@ func TestLocalCM(t *testing.T) {
// non-recursive find localPath/testPrefix
// return:
// localPath/testPrefix
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, false)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix1, false)
assert.NoError(t, err)
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
@ -564,7 +564,7 @@ func TestLocalCM(t *testing.T) {
// recursive find localPath/testPrefix
// return:
// localPath/testPrefix/bcd
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, true)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix1, true)
assert.NoError(t, err)
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
@ -573,7 +573,7 @@ func TestLocalCM(t *testing.T) {
// non-recursive find localPath/testPrefix/a*
// return:
// localPath/testPrefix/abc
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, false)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix2, false)
assert.NoError(t, err)
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
@ -581,7 +581,7 @@ func TestLocalCM(t *testing.T) {
// recursive find localPath/testPrefix/a*
// no file returned
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix2, true)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix2, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(dirs))
assert.Equal(t, 0, len(mods))
@ -593,7 +593,7 @@ func TestLocalCM(t *testing.T) {
// recursive find localPath/testPrefix
// no file returned
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, true)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix1, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(dirs))
assert.Equal(t, 0, len(mods))
@ -601,10 +601,27 @@ func TestLocalCM(t *testing.T) {
// recursive find localPath/testPrefix
// return
// localPath/testPrefix
dirs, mods, err = testCM.ListWithPrefix(ctx, testPrefix1, false)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, testPrefix1, false)
assert.NoError(t, err)
assert.Equal(t, 1, len(dirs))
assert.Equal(t, 1, len(mods))
assert.Contains(t, dirs, filepath.Dir(key4))
})
}
func readAllChunkWithPrefix(ctx context.Context, manager ChunkManager, prefix string) ([]string, [][]byte, error) {
var paths []string
var contents [][]byte
if err := manager.WalkWithPrefix(ctx, prefix, true, func(object *ChunkObjectInfo) bool {
paths = append(paths, object.FilePath)
content, err := manager.Read(ctx, object.FilePath)
if err != nil {
return false
}
contents = append(contents, content)
return true
}); err != nil {
return nil, nil, err
}
return paths, contents, nil
}

View File

@ -1,480 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"bytes"
"container/list"
"context"
"io"
"strings"
"time"
"github.com/cockroachdb/errors"
minio "github.com/minio/minio-go/v7"
"go.uber.org/zap"
"golang.org/x/exp/mmap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
var CheckBucketRetryAttempts uint = 20
// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
*minio.Client
// ctx context.Context
bucketName string
rootPath string
}
var _ ChunkManager = (*MinioChunkManager)(nil)
// NewMinioChunkManager create a new local manager object.
// Deprecated: Do not call this directly! Use factory.NewPersistentStorageChunkManager instead.
func NewMinioChunkManager(ctx context.Context, opts ...Option) (*MinioChunkManager, error) {
c := newDefaultConfig()
for _, opt := range opts {
opt(c)
}
return newMinioChunkManagerWithConfig(ctx, c)
}
func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunkManager, error) {
minIOClient, err := newMinioClient(ctx, c)
if err != nil {
return nil, err
}
mcm := &MinioChunkManager{
Client: minIOClient,
bucketName: c.bucketName,
}
mcm.rootPath = mcm.normalizeRootPath(c.rootPath)
log.Info("minio chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", mcm.RootPath()))
return mcm, nil
}
// normalizeRootPath
func (mcm *MinioChunkManager) normalizeRootPath(rootPath string) string {
// no leading "/"
return strings.TrimLeft(rootPath, "/")
}
// SetVar set the variable value of mcm
func (mcm *MinioChunkManager) SetVar(bucketName string, rootPath string) {
log.Info("minio chunkmanager ", zap.String("bucketName", bucketName), zap.String("rootpath", rootPath))
mcm.bucketName = bucketName
mcm.rootPath = rootPath
}
// RootPath returns minio root path.
func (mcm *MinioChunkManager) RootPath() string {
return mcm.rootPath
}
// Path returns the path of minio data if exists.
func (mcm *MinioChunkManager) Path(ctx context.Context, filePath string) (string, error) {
exist, err := mcm.Exist(ctx, filePath)
if err != nil {
return "", err
}
if !exist {
return "", merr.WrapErrIoKeyNotFound(filePath)
}
return filePath, nil
}
// Reader returns the path of minio data if exists.
func (mcm *MinioChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error) {
reader, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{})
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
return reader, nil
}
func (mcm *MinioChunkManager) Size(ctx context.Context, filePath string) (int64, error) {
objectInfo, err := mcm.statMinioObject(ctx, mcm.bucketName, filePath, minio.StatObjectOptions{})
if err != nil {
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return 0, err
}
return objectInfo.Size, nil
}
// Write writes the data to minio storage.
func (mcm *MinioChunkManager) Write(ctx context.Context, filePath string, content []byte) error {
_, err := mcm.putMinioObject(ctx, mcm.bucketName, filePath, bytes.NewReader(content), int64(len(content)), minio.PutObjectOptions{})
if err != nil {
log.Warn("failed to put object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
metrics.PersistentDataKvSize.WithLabelValues(metrics.DataPutLabel).Observe(float64(len(content)))
return nil
}
// MultiWrite saves multiple objects, the path is the key of @kvs.
// The object value is the value of @kvs.
func (mcm *MinioChunkManager) MultiWrite(ctx context.Context, kvs map[string][]byte) error {
errors := make([]error, 0, len(kvs))
for key, value := range kvs {
err := mcm.Write(ctx, key, value)
errors = append(errors, err)
}
return merr.Combine(errors...)
}
// Exist checks whether chunk is saved to minio storage.
func (mcm *MinioChunkManager) Exist(ctx context.Context, filePath string) (bool, error) {
_, err := mcm.statMinioObject(ctx, mcm.bucketName, filePath, minio.StatObjectOptions{})
if err != nil {
if errors.Is(err, merr.ErrIoKeyNotFound) {
return false, nil
}
log.Warn("failed to stat object",
zap.String("bucket", mcm.bucketName),
zap.String("path", filePath),
zap.Error(err),
)
return false, err
}
return true, nil
}
// Read reads the minio storage data if exists.
func (mcm *MinioChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
var data []byte
err := retry.Do(ctx, func() error {
start := time.Now()
object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{})
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
defer object.Close()
// Prefetch object data
var empty []byte
_, err = object.Read(empty)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
return err
}
objectInfo, err := object.Stat()
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
data, err = Read(object, objectInfo.Size)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(objectInfo.Size))
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(time.Since(start).Milliseconds()))
return nil
}, retry.Attempts(3), retry.RetryErr(merr.IsRetryableErr))
if err != nil {
return nil, err
}
return data, nil
}
func (mcm *MinioChunkManager) MultiRead(ctx context.Context, keys []string) ([][]byte, error) {
errors := make([]error, 0)
var objectsValues [][]byte
for _, key := range keys {
objectValue, err := mcm.Read(ctx, key)
if err != nil {
errors = append(errors, err)
}
objectsValues = append(objectsValues, objectValue)
}
return objectsValues, merr.Combine(errors...)
}
func (mcm *MinioChunkManager) ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error) {
objectsKeys, _, err := mcm.ListWithPrefix(ctx, prefix, true)
if err != nil {
return nil, nil, err
}
objectsValues, err := mcm.MultiRead(ctx, objectsKeys)
if err != nil {
return nil, nil, err
}
return objectsKeys, objectsValues, nil
}
func (mcm *MinioChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
return nil, merr.WrapErrServiceInternal("mmap not supported for MinIO chunk manager")
}
// ReadAt reads specific position data of minio storage if exists.
func (mcm *MinioChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error) {
if off < 0 || length < 0 {
return nil, io.EOF
}
start := time.Now()
opts := minio.GetObjectOptions{}
err := opts.SetRange(off, off+length-1)
if err != nil {
log.Warn("failed to set range", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, merr.WrapErrParameterInvalidMsg("invalid range while reading %s: %v", filePath, err)
}
object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, opts)
if err != nil {
log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
defer object.Close()
data, err := Read(object, length)
if err != nil {
err = checkObjectStorageError(filePath, err)
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return nil, err
}
metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(length))
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(time.Since(start).Milliseconds()))
return data, nil
}
// Remove deletes an object with @key.
func (mcm *MinioChunkManager) Remove(ctx context.Context, filePath string) error {
err := mcm.removeMinioObject(ctx, mcm.bucketName, filePath, minio.RemoveObjectOptions{})
if err != nil {
log.Warn("failed to remove object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
return nil
}
// MultiRemove deletes a objects with @keys.
func (mcm *MinioChunkManager) MultiRemove(ctx context.Context, keys []string) error {
var el error
for _, key := range keys {
err := mcm.Remove(ctx, key)
if err != nil {
el = merr.Combine(el, errors.Wrapf(err, "failed to remove %s", key))
}
}
return el
}
// RemoveWithPrefix removes all objects with the same prefix @prefix from minio.
func (mcm *MinioChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error {
objects := mcm.listMinioObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true})
i := 0
maxGoroutine := 10
removeKeys := make([]string, 0, len(objects))
for object := range objects {
if object.Err != nil {
return object.Err
}
removeKeys = append(removeKeys, object.Key)
}
for i < len(removeKeys) {
runningGroup, groupCtx := errgroup.WithContext(ctx)
for j := 0; j < maxGoroutine && i < len(removeKeys); j++ {
key := removeKeys[i]
runningGroup.Go(func() error {
err := mcm.removeMinioObject(groupCtx, mcm.bucketName, key, minio.RemoveObjectOptions{})
if err != nil {
log.Warn("failed to remove object", zap.String("path", key), zap.Error(err))
return err
}
return nil
})
i++
}
if err := runningGroup.Wait(); err != nil {
return err
}
}
return nil
}
// ListWithPrefix returns objects with provided prefix.
// by default, if `recursive`=false, list object with return object with path under save level
// say minio has followinng objects: [a, ab, a/b, ab/c]
// calling `ListWithPrefix` with `prefix` = a && `recursive` = false will only returns [a, ab]
// If caller needs all objects without level limitation, `recursive` shall be true.
func (mcm *MinioChunkManager) ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error) {
// cannot use ListObjects(ctx, bucketName, Opt{Prefix:prefix, Recursive:true})
// if minio has lots of objects under the provided path
// recursive = true may timeout during the recursive browsing the objects.
// See also: https://github.com/milvus-io/milvus/issues/19095
var objectsKeys []string
var modTimes []time.Time
tasks := list.New()
tasks.PushBack(prefix)
for tasks.Len() > 0 {
e := tasks.Front()
pre := e.Value.(string)
tasks.Remove(e)
// TODO add concurrent call if performance matters
// only return current level per call
objects := mcm.listMinioObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: pre, Recursive: false})
for object := range objects {
if object.Err != nil {
log.Warn("failed to list with prefix", zap.String("bucket", mcm.bucketName), zap.String("prefix", prefix), zap.Error(object.Err))
return nil, nil, object.Err
}
// with tailing "/", object is a "directory"
if strings.HasSuffix(object.Key, "/") && recursive {
// enqueue when recursive is true
if object.Key != pre {
tasks.PushBack(object.Key)
}
continue
}
objectsKeys = append(objectsKeys, object.Key)
modTimes = append(modTimes, object.LastModified)
}
}
return objectsKeys, modTimes, nil
}
// Learn from file.ReadFile
func Read(r io.Reader, size int64) ([]byte, error) {
data := make([]byte, 0, size)
for {
n, err := r.Read(data[len(data):cap(data)])
data = data[:len(data)+n]
if err != nil {
if err == io.EOF {
err = nil
}
return data, err
}
if len(data) == cap(data) {
return data, nil
}
}
}
func (mcm *MinioChunkManager) getMinioObject(ctx context.Context, bucketName, objectName string,
opts minio.GetObjectOptions,
) (*minio.Object, error) {
start := timerecord.NewTimeRecorder("getMinioObject")
reader, err := mcm.Client.GetObject(ctx, bucketName, objectName, opts)
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.TotalLabel).Inc()
if err != nil {
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.FailLabel).Inc()
return nil, checkObjectStorageError(objectName, err)
}
if reader == nil {
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.FailLabel).Inc()
return nil, nil
}
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.SuccessLabel).Inc()
return reader, nil
}
func (mcm *MinioChunkManager) putMinioObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
opts minio.PutObjectOptions,
) (minio.UploadInfo, error) {
start := timerecord.NewTimeRecorder("putMinioObject")
info, err := mcm.Client.PutObject(ctx, bucketName, objectName, reader, objectSize, opts)
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataPutLabel, metrics.TotalLabel).Inc()
if err != nil {
metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.FailLabel).Inc()
return info, checkObjectStorageError(objectName, err)
}
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataPutLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.SuccessLabel).Inc()
return info, nil
}
func (mcm *MinioChunkManager) statMinioObject(ctx context.Context, bucketName, objectName string,
opts minio.StatObjectOptions,
) (minio.ObjectInfo, error) {
start := timerecord.NewTimeRecorder("statMinioObject")
info, err := mcm.Client.StatObject(ctx, bucketName, objectName, opts)
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.TotalLabel).Inc()
if err != nil {
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.FailLabel).Inc()
err = checkObjectStorageError(objectName, err)
return info, err
}
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataStatLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.SuccessLabel).Inc()
return info, nil
}
func (mcm *MinioChunkManager) listMinioObjects(ctx context.Context, bucketName string,
opts minio.ListObjectsOptions,
) <-chan minio.ObjectInfo {
start := timerecord.NewTimeRecorder("listMinioObjects")
res := mcm.Client.ListObjects(ctx, bucketName, opts)
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataListLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.TotalLabel).Inc()
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.SuccessLabel).Inc()
return res
}
func (mcm *MinioChunkManager) removeMinioObject(ctx context.Context, bucketName, objectName string,
opts minio.RemoveObjectOptions,
) error {
start := timerecord.NewTimeRecorder("removeMinioObject")
err := mcm.Client.RemoveObject(ctx, bucketName, objectName, opts)
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.TotalLabel).Inc()
if err != nil {
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.FailLabel).Inc()
return checkObjectStorageError(objectName, err)
}
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataRemoveLabel).Observe(float64(start.ElapseSpan().Milliseconds()))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.SuccessLabel).Inc()
return nil
}

View File

@ -1,633 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"fmt"
"io"
"math/rand"
"path"
"strings"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// TODO: NewMinioChunkManager is deprecated. Rewrite this unittest.
func newMinIOChunkManager(ctx context.Context, bucketName string, rootPath string) (*MinioChunkManager, error) {
endPoint := getMinioAddress()
accessKeyID := Params.MinioCfg.AccessKeyID.GetValue()
secretAccessKey := Params.MinioCfg.SecretAccessKey.GetValue()
useSSL := Params.MinioCfg.UseSSL.GetAsBool()
sslCACert := Params.MinioCfg.SslCACert.GetValue()
client, err := NewMinioChunkManager(ctx,
RootPath(rootPath),
Address(endPoint),
AccessKeyID(accessKeyID),
SecretAccessKeyID(secretAccessKey),
UseSSL(useSSL),
SslCACert(sslCACert),
BucketName(bucketName),
UseIAM(false),
CloudProvider("aws"),
IAMEndpoint(""),
CreateBucket(true),
UseVirtualHost(false),
Region(""),
)
return client, err
}
func getMinioAddress() string {
minioHost := Params.MinioCfg.Address.GetValue()
if strings.Contains(minioHost, ":") {
return minioHost
}
port := Params.MinioCfg.Port.GetValue()
return minioHost + ":" + port
}
func TestMinIOCMFail(t *testing.T) {
ctx := context.Background()
accessKeyID := Params.MinioCfg.AccessKeyID.GetValue()
secretAccessKey := Params.MinioCfg.SecretAccessKey.GetValue()
useSSL := Params.MinioCfg.UseSSL.GetAsBool()
sslCACert := Params.MinioCfg.SslCACert.GetValue()
client, err := NewMinioChunkManager(ctx,
Address("9.9.9.9:invalid"),
AccessKeyID(accessKeyID),
SecretAccessKeyID(secretAccessKey),
UseSSL(useSSL),
SslCACert(sslCACert),
BucketName("test"),
CreateBucket(true),
)
assert.Error(t, err)
assert.Nil(t, client)
}
func TestMinIOCM(t *testing.T) {
testBucket := Params.MinioCfg.BucketName.GetValue()
configRoot := Params.MinioCfg.RootPath.GetValue()
testMinIOKVRoot := path.Join(configRoot, fmt.Sprintf("minio-ut-%d", rand.Int()))
t.Run("test load", func(t *testing.T) {
testLoadRoot := path.Join(testMinIOKVRoot, "test_load")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testLoadRoot)
require.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testLoadRoot)
assert.Equal(t, testLoadRoot, testCM.RootPath())
prepareTests := []struct {
key string
value []byte
}{
{"abc", []byte("123")},
{"abcd", []byte("1234")},
{"key_1", []byte("111")},
{"key_2", []byte("222")},
{"key_3", []byte("333")},
}
for _, test := range prepareTests {
err = testCM.Write(ctx, path.Join(testLoadRoot, test.key), test.value)
require.NoError(t, err)
}
loadTests := []struct {
isvalid bool
loadKey string
expectedValue []byte
description string
}{
{true, "abc", []byte("123"), "load valid key abc"},
{true, "abcd", []byte("1234"), "load valid key abcd"},
{true, "key_1", []byte("111"), "load valid key key_1"},
{true, "key_2", []byte("222"), "load valid key key_2"},
{true, "key_3", []byte("333"), "load valid key key_3"},
{false, "key_not_exist", []byte(""), "load invalid key key_not_exist"},
{false, "/", []byte(""), "load leading slash"},
}
for _, test := range loadTests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
got, err := testCM.Read(ctx, path.Join(testLoadRoot, test.loadKey))
assert.NoError(t, err)
assert.Equal(t, test.expectedValue, got)
} else {
if test.loadKey == "/" {
got, err := testCM.Read(ctx, test.loadKey)
assert.Error(t, err)
assert.Empty(t, got)
return
}
got, err := testCM.Read(ctx, path.Join(testLoadRoot, test.loadKey))
assert.Error(t, err)
assert.Empty(t, got)
}
})
}
loadWithPrefixTests := []struct {
isvalid bool
prefix string
expectedValue [][]byte
description string
}{
{true, "abc", [][]byte{[]byte("123"), []byte("1234")}, "load with valid prefix abc"},
{true, "key_", [][]byte{[]byte("111"), []byte("222"), []byte("333")}, "load with valid prefix key_"},
{true, "prefix", [][]byte{}, "load with valid but not exist prefix prefix"},
}
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, gotv, err := testCM.ReadWithPrefix(ctx, path.Join(testLoadRoot, test.prefix))
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
assert.Equal(t, len(test.expectedValue), len(gotv))
assert.ElementsMatch(t, test.expectedValue, gotv)
})
}
multiLoadTests := []struct {
isvalid bool
multiKeys []string
expectedValue [][]byte
description string
}{
{false, []string{"key_1", "key_not_exist"}, [][]byte{[]byte("111"), nil}, "multiload 1 exist 1 not"},
{true, []string{"abc", "key_3"}, [][]byte{[]byte("123"), []byte("333")}, "multiload 2 exist"},
}
for _, test := range multiLoadTests {
t.Run(test.description, func(t *testing.T) {
for i := range test.multiKeys {
test.multiKeys[i] = path.Join(testLoadRoot, test.multiKeys[i])
}
if test.isvalid {
got, err := testCM.MultiRead(ctx, test.multiKeys)
assert.NoError(t, err)
assert.Equal(t, test.expectedValue, got)
} else {
got, err := testCM.MultiRead(ctx, test.multiKeys)
assert.Error(t, err)
assert.Equal(t, test.expectedValue, got)
}
})
}
})
t.Run("test MultiSave", func(t *testing.T) {
testMultiSaveRoot := path.Join(testMinIOKVRoot, "test_multisave")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testMultiSaveRoot)
assert.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testMultiSaveRoot)
err = testCM.Write(ctx, path.Join(testMultiSaveRoot, "key_1"), []byte("111"))
assert.NoError(t, err)
kvs := map[string][]byte{
path.Join(testMultiSaveRoot, "key_1"): []byte("123"),
path.Join(testMultiSaveRoot, "key_2"): []byte("456"),
}
err = testCM.MultiWrite(ctx, kvs)
assert.NoError(t, err)
val, err := testCM.Read(ctx, path.Join(testMultiSaveRoot, "key_1"))
assert.NoError(t, err)
assert.Equal(t, []byte("123"), val)
})
t.Run("test Remove", func(t *testing.T) {
testRemoveRoot := path.Join(testMinIOKVRoot, "test_remove")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testRemoveRoot)
assert.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testRemoveRoot)
prepareTests := []struct {
k string
v []byte
}{
{"key_1", []byte("123")},
{"key_2", []byte("456")},
{"mkey_1", []byte("111")},
{"mkey_2", []byte("222")},
{"mkey_3", []byte("333")},
{"key_prefix_1", []byte("111")},
{"key_prefix_2", []byte("222")},
{"key_prefix_3", []byte("333")},
}
for _, test := range prepareTests {
k := path.Join(testRemoveRoot, test.k)
err = testCM.Write(ctx, k, test.v)
require.NoError(t, err)
}
removeTests := []struct {
removeKey string
valueBeforeRemove []byte
description string
}{
{"key_1", []byte("123"), "remove key_1"},
{"key_2", []byte("456"), "remove key_2"},
}
for _, test := range removeTests {
t.Run(test.description, func(t *testing.T) {
k := path.Join(testRemoveRoot, test.removeKey)
v, err := testCM.Read(ctx, k)
require.NoError(t, err)
require.Equal(t, test.valueBeforeRemove, v)
err = testCM.Remove(ctx, k)
assert.NoError(t, err)
v, err = testCM.Read(ctx, k)
require.Error(t, err)
require.Empty(t, v)
})
}
multiRemoveTest := []string{
path.Join(testRemoveRoot, "mkey_1"),
path.Join(testRemoveRoot, "mkey_2"),
path.Join(testRemoveRoot, "mkey_3"),
}
lv, err := testCM.MultiRead(ctx, multiRemoveTest)
require.NoError(t, err)
require.ElementsMatch(t, [][]byte{[]byte("111"), []byte("222"), []byte("333")}, lv)
err = testCM.MultiRemove(ctx, multiRemoveTest)
assert.NoError(t, err)
for _, k := range multiRemoveTest {
v, err := testCM.Read(ctx, k)
assert.Error(t, err)
assert.Empty(t, v)
}
removeWithPrefixTest := []string{
path.Join(testRemoveRoot, "key_prefix_1"),
path.Join(testRemoveRoot, "key_prefix_2"),
path.Join(testRemoveRoot, "key_prefix_3"),
}
removePrefix := path.Join(testRemoveRoot, "key_prefix")
lv, err = testCM.MultiRead(ctx, removeWithPrefixTest)
require.NoError(t, err)
require.ElementsMatch(t, [][]byte{[]byte("111"), []byte("222"), []byte("333")}, lv)
err = testCM.RemoveWithPrefix(ctx, removePrefix)
assert.NoError(t, err)
for _, k := range removeWithPrefixTest {
v, err := testCM.Read(ctx, k)
assert.Error(t, err)
assert.Empty(t, v)
}
})
t.Run("test ReadAt", func(t *testing.T) {
testLoadPartialRoot := path.Join(testMinIOKVRoot, "load_partial")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testLoadPartialRoot)
require.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testLoadPartialRoot)
key := path.Join(testLoadPartialRoot, "TestMinIOKV_LoadPartial_key")
value := []byte("TestMinIOKV_LoadPartial_value")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
var off, length int64
var partial []byte
off, length = 1, 1
partial, err = testCM.ReadAt(ctx, key, off, length)
assert.NoError(t, err)
assert.ElementsMatch(t, partial, value[off:off+length])
off, length = 0, int64(len(value))
partial, err = testCM.ReadAt(ctx, key, off, length)
assert.NoError(t, err)
assert.ElementsMatch(t, partial, value[off:off+length])
// error case
off, length = 5, -2
_, err = testCM.ReadAt(ctx, key, off, length)
assert.Error(t, err)
off, length = -1, 2
_, err = testCM.ReadAt(ctx, key, off, length)
assert.Error(t, err)
off, length = 1, -2
_, err = testCM.ReadAt(ctx, key, off, length)
assert.Error(t, err)
err = testCM.Remove(ctx, key)
assert.NoError(t, err)
off, length = 1, 1
_, err = testCM.ReadAt(ctx, key, off, length)
assert.Error(t, err)
})
t.Run("test Size", func(t *testing.T) {
testGetSizeRoot := path.Join(testMinIOKVRoot, "get_size")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testGetSizeRoot)
require.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testGetSizeRoot)
key := path.Join(testGetSizeRoot, "TestMinIOKV_GetSize_key")
value := []byte("TestMinIOKV_GetSize_value")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
size, err := testCM.Size(ctx, key)
assert.NoError(t, err)
assert.Equal(t, size, int64(len(value)))
key2 := path.Join(testGetSizeRoot, "TestMemoryKV_GetSize_key2")
size, err = testCM.Size(ctx, key2)
assert.Error(t, err)
assert.Equal(t, int64(0), size)
})
t.Run("test Path", func(t *testing.T) {
testGetPathRoot := path.Join(testMinIOKVRoot, "get_path")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testGetPathRoot)
require.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testGetPathRoot)
key := path.Join(testGetPathRoot, "TestMinIOKV_GetSize_key")
value := []byte("TestMinIOKV_GetSize_value")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
p, err := testCM.Path(ctx, key)
assert.NoError(t, err)
assert.Equal(t, p, key)
key2 := path.Join(testGetPathRoot, "TestMemoryKV_GetSize_key2")
p, err = testCM.Path(ctx, key2)
assert.Error(t, err)
assert.Equal(t, p, "")
})
t.Run("test Mmap", func(t *testing.T) {
testMmapRoot := path.Join(testMinIOKVRoot, "mmap")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testMmapRoot)
require.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testMmapRoot)
key := path.Join(testMmapRoot, "TestMinIOKV_GetSize_key")
value := []byte("TestMinIOKV_GetSize_value")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
r, err := testCM.Mmap(ctx, key)
assert.Error(t, err)
assert.Nil(t, r)
})
t.Run("test Prefix", func(t *testing.T) {
testPrefix := path.Join(testMinIOKVRoot, "prefix")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testPrefix)
require.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testPrefix)
pathB := path.Join("a", "b")
key := path.Join(testPrefix, pathB)
value := []byte("a")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
pathC := path.Join("a", "c")
key = path.Join(testPrefix, pathC)
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
pathPrefix := path.Join(testPrefix, "a")
r, m, err := testCM.ListWithPrefix(ctx, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, len(r), 2)
assert.Equal(t, len(m), 2)
key = path.Join(testPrefix, "b", "b", "b")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
key = path.Join(testPrefix, "b", "a", "b")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
key = path.Join(testPrefix, "bc", "a", "b")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
dirs, mods, err := testCM.ListWithPrefix(ctx, testPrefix+"/", true)
assert.NoError(t, err)
assert.Equal(t, 5, len(dirs))
assert.Equal(t, 5, len(mods))
dirs, mods, err = testCM.ListWithPrefix(ctx, path.Join(testPrefix, "b"), true)
assert.NoError(t, err)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
testCM.RemoveWithPrefix(ctx, testPrefix)
r, m, err = testCM.ListWithPrefix(ctx, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(r))
assert.Equal(t, 0, len(m))
// test wrong prefix
b := make([]byte, 2048)
pathWrong := path.Join(testPrefix, string(b))
_, _, err = testCM.ListWithPrefix(ctx, pathWrong, true)
assert.Error(t, err)
})
t.Run("test NoSuchKey", func(t *testing.T) {
testPrefix := path.Join(testMinIOKVRoot, "nokey")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCM, err := newMinIOChunkManager(ctx, testBucket, testPrefix)
require.NoError(t, err)
defer testCM.RemoveWithPrefix(ctx, testPrefix)
key := "a"
_, err = testCM.Read(ctx, key)
assert.Error(t, err)
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
_, err = testCM.ReadAt(ctx, key, 100, 1)
assert.Error(t, err)
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
})
}
func TestMinioChunkManager_normalizeRootPath(t *testing.T) {
type testCase struct {
input string
expected string
}
cases := []testCase{
{
input: "files",
expected: "files",
},
{
input: "files/",
expected: "files/",
},
{
input: "/files",
expected: "files",
},
{
input: "//files",
expected: "files",
},
{
input: "files/my-folder",
expected: "files/my-folder",
},
{
input: "",
expected: "",
},
}
mcm := &MinioChunkManager{}
for _, test := range cases {
t.Run(test.input, func(t *testing.T) {
assert.Equal(t, test.expected, mcm.normalizeRootPath(test.input))
})
}
}
func TestMinioChunkManager_Read(t *testing.T) {
var reader MockReader
reader.offset = new(int)
reader.value = make([]byte, 10)
reader.lastEOF = true
for i := 0; i < 10; i++ {
reader.value[i] = byte(i)
}
value, err := Read(reader, 10)
assert.Equal(t, len(value), 10)
for i := 0; i < 10; i++ {
assert.Equal(t, value[i], byte(i))
}
assert.NoError(t, err)
}
func TestMinioChunkManager_ReadEOF(t *testing.T) {
var reader MockReader
reader.offset = new(int)
reader.value = make([]byte, 10)
reader.lastEOF = false
for i := 0; i < 10; i++ {
reader.value[i] = byte(i)
}
value, err := Read(reader, 10)
assert.Equal(t, len(value), 10)
for i := 0; i < 10; i++ {
assert.Equal(t, value[i], byte(i))
}
assert.NoError(t, err)
}
type MockReader struct {
value []byte
offset *int
lastEOF bool
}
func (r MockReader) Read(p []byte) (n int, err error) {
if len(r.value) == *r.offset {
return 0, io.EOF
}
cap := len(r.value) - *r.offset
if cap < 5 {
copy(p, r.value[*r.offset:])
*r.offset = len(r.value)
if r.lastEOF {
return cap, io.EOF
}
return cap, nil
}
n = rand.Intn(5)
copy(p, r.value[*r.offset:(*r.offset+n)])
*r.offset += n
return n, nil
}

View File

@ -17,13 +17,11 @@
package storage
import (
"container/list"
"context"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
@ -33,9 +31,14 @@ import (
"github.com/milvus-io/milvus/internal/storage/gcp"
"github.com/milvus-io/milvus/internal/storage/tencent"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
)
var CheckBucketRetryAttempts uint = 20
var _ ObjectStorage = (*MinioObjectStorage)(nil)
type MinioObjectStorage struct {
*minio.Client
}
@ -191,45 +194,29 @@ func (minioObjectStorage *MinioObjectStorage) StatObject(ctx context.Context, bu
return info.Size, checkObjectStorageError(objectName, err)
}
func (minioObjectStorage *MinioObjectStorage) ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
var objectsKeys []string
var modTimes []time.Time
tasks := list.New()
tasks.PushBack(prefix)
for tasks.Len() > 0 {
e := tasks.Front()
pre := e.Value.(string)
tasks.Remove(e)
func (minioObjectStorage *MinioObjectStorage) WalkWithObjects(ctx context.Context, bucketName string, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) (err error) {
// if minio has lots of objects under the provided path
// recursive = true may timeout during the recursive browsing the objects.
// See also: https://github.com/milvus-io/milvus/issues/19095
// So we can change the `ListObjectsMaxKeys` to limit the max keys by batch to avoid timeout.
in := minioObjectStorage.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: recursive,
MaxKeys: paramtable.Get().MinioCfg.ListObjectsMaxKeys.GetAsInt(),
})
res := minioObjectStorage.Client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{
Prefix: pre,
Recursive: false,
})
objects := map[string]time.Time{}
for object := range res {
if object.Err != nil {
log.Warn("failed to list with prefix", zap.String("bucket", bucketName), zap.String("prefix", prefix), zap.Error(object.Err))
return []string{}, []time.Time{}, object.Err
}
objects[object.Key] = object.LastModified
for object := range in {
if object.Err != nil {
return object.Err
}
for object, lastModified := range objects {
// with tailing "/", object is a "directory"
if strings.HasSuffix(object, "/") && recursive {
// enqueue when recursive is true
if object != pre {
tasks.PushBack(object)
}
continue
}
objectsKeys = append(objectsKeys, object)
modTimes = append(modTimes, lastModified)
if !walkFunc(&ChunkObjectInfo{FilePath: object.Key, ModifyTime: object.LastModified}) {
return nil
}
}
return objectsKeys, modTimes, nil
return nil
}
func (minioObjectStorage *MinioObjectStorage) RemoveObject(ctx context.Context, bucketName, objectName string) error {
return minioObjectStorage.Client.RemoveObject(ctx, bucketName, objectName, minio.RemoveObjectOptions{})
err := minioObjectStorage.Client.RemoveObject(ctx, bucketName, objectName, minio.RemoveObjectOptions{})
return checkObjectStorageError(objectName, err)
}

View File

@ -22,6 +22,7 @@ import (
"fmt"
"io"
"testing"
"time"
"github.com/minio/minio-go/v7"
"github.com/stretchr/testify/assert"
@ -132,7 +133,7 @@ func TestMinioObjectStorage(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, false)
gotk, _, err := listAllObjectsWithPrefixAtBucket(ctx, testCM, config.bucketName, test.prefix, false)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
@ -166,7 +167,7 @@ func TestMinioObjectStorage(t *testing.T) {
for _, test := range prepareTests {
t.Run(test.key, func(t *testing.T) {
err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value), int64(len(test.value)))
require.Equal(t, test.valid, err == nil)
require.Equal(t, test.valid, err == nil, err)
})
}
@ -183,7 +184,7 @@ func TestMinioObjectStorage(t *testing.T) {
for _, test := range insertWithPrefixTests {
t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) {
gotk, _, err := testCM.ListObjects(ctx, config.bucketName, test.prefix, test.recursive)
gotk, _, err := listAllObjectsWithPrefixAtBucket(ctx, testCM, config.bucketName, test.prefix, test.recursive)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
@ -226,3 +227,17 @@ func TestMinioObjectStorage(t *testing.T) {
config.cloudProvider = cloudProvider
})
}
// listAllObjectsWithPrefixAtBucket is a helper function to list all objects with same @prefix at bucket by using `ListWithPrefix`.
func listAllObjectsWithPrefixAtBucket(ctx context.Context, objectStorage ObjectStorage, bucket string, prefix string, recursive bool) ([]string, []time.Time, error) {
var dirs []string
var mods []time.Time
if err := objectStorage.WalkWithObjects(ctx, bucket, prefix, recursive, func(chunkObjectInfo *ChunkObjectInfo) bool {
dirs = append(dirs, chunkObjectInfo.FilePath)
mods = append(mods, chunkObjectInfo.ModifyTime)
return true
}); err != nil {
return nil, nil, err
}
return dirs, mods, nil
}

View File

@ -21,7 +21,6 @@ import (
"context"
"io"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
@ -46,11 +45,20 @@ const (
CloudProviderTencent = "tencent"
)
// ChunkObjectWalkFunc is the callback function for walking objects.
// If return false, WalkWithObjects will stop.
// Otherwise, WalkWithObjects will continue until reach the last object.
type ChunkObjectWalkFunc func(chunkObjectInfo *ChunkObjectInfo) bool
type ObjectStorage interface {
GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error)
PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error
StatObject(ctx context.Context, bucketName, objectName string) (int64, error)
ListObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error)
// WalkWithPrefix walks all objects with prefix @prefix, and call walker for each object.
// WalkWithPrefix will stop if following conditions met:
// 1. cb return false or reach the last object, WalkWithPrefix will stop and return nil.
// 2. underlying walking failed or context canceled, WalkWithPrefix will stop and return a error.
WalkWithObjects(ctx context.Context, bucketName string, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) error
RemoveObject(ctx context.Context, bucketName, objectName string) error
}
@ -85,11 +93,26 @@ func NewRemoteChunkManager(ctx context.Context, c *config) (*RemoteChunkManager,
return mcm, nil
}
// NewRemoteChunkManagerForTesting is used for testing.
func NewRemoteChunkManagerForTesting(c *minio.Client, bucket string, rootPath string) *RemoteChunkManager {
mcm := &RemoteChunkManager{
client: &MinioObjectStorage{c},
bucketName: bucket,
rootPath: rootPath,
}
return mcm
}
// RootPath returns minio root path.
func (mcm *RemoteChunkManager) RootPath() string {
return mcm.rootPath
}
// UnderlyingObjectStorage returns the underlying object storage.
func (mcm *RemoteChunkManager) UnderlyingObjectStorage() ObjectStorage {
return mcm.client
}
// Path returns the path of minio data if exists.
func (mcm *RemoteChunkManager) Path(ctx context.Context, filePath string) (string, error) {
exist, err := mcm.Exist(ctx, filePath)
@ -184,7 +207,7 @@ func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byt
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err
}
data, err = Read(object, size)
data, err = read(object, size)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
@ -214,19 +237,6 @@ func (mcm *RemoteChunkManager) MultiRead(ctx context.Context, keys []string) ([]
return objectsValues, el
}
func (mcm *RemoteChunkManager) ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error) {
objectsKeys, _, err := mcm.ListWithPrefix(ctx, prefix, true)
if err != nil {
return nil, nil, err
}
objectsValues, err := mcm.MultiRead(ctx, objectsKeys)
if err != nil {
return nil, nil, err
}
return objectsKeys, objectsValues, nil
}
func (mcm *RemoteChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
return nil, errors.New("this method has not been implemented")
}
@ -244,7 +254,7 @@ func (mcm *RemoteChunkManager) ReadAt(ctx context.Context, filePath string, off
}
defer object.Close()
data, err := Read(object, length)
data, err := read(object, length)
err = checkObjectStorageError(filePath, err)
if err != nil {
log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
@ -278,47 +288,41 @@ func (mcm *RemoteChunkManager) MultiRemove(ctx context.Context, keys []string) e
// RemoveWithPrefix removes all objects with the same prefix @prefix from minio.
func (mcm *RemoteChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error {
removeKeys, _, err := mcm.listObjects(ctx, mcm.bucketName, prefix, true)
if err != nil {
// removeObject in parallel.
runningGroup, _ := errgroup.WithContext(ctx)
runningGroup.SetLimit(10)
err := mcm.WalkWithPrefix(ctx, prefix, true, func(object *ChunkObjectInfo) bool {
key := object.FilePath
runningGroup.Go(func() error {
err := mcm.removeObject(ctx, mcm.bucketName, key)
if err != nil {
log.Warn("failed to remove object", zap.String("path", key), zap.Error(err))
}
return err
})
return true
})
// wait all goroutines done.
if err := runningGroup.Wait(); err != nil {
return err
}
i := 0
maxGoroutine := 10
for i < len(removeKeys) {
runningGroup, groupCtx := errgroup.WithContext(ctx)
for j := 0; j < maxGoroutine && i < len(removeKeys); j++ {
key := removeKeys[i]
runningGroup.Go(func() error {
err := mcm.removeObject(groupCtx, mcm.bucketName, key)
if err != nil {
log.Warn("failed to remove object", zap.String("path", key), zap.Error(err))
return err
}
return nil
})
i++
}
if err := runningGroup.Wait(); err != nil {
return err
}
}
return nil
// return the iteration error
return err
}
// ListWithPrefix returns objects with provided prefix.
// by default, if `recursive`=false, list object with return object with path under save level
// say minio has followinng objects: [a, ab, a/b, ab/c]
// calling `ListWithPrefix` with `prefix` = a && `recursive` = false will only returns [a, ab]
// If caller needs all objects without level limitation, `recursive` shall be true.
func (mcm *RemoteChunkManager) ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error) {
// cannot use ListObjects(ctx, bucketName, Opt{Prefix:prefix, Recursive:true})
// if minio has lots of objects under the provided path
// recursive = true may timeout during the recursive browsing the objects.
// See also: https://github.com/milvus-io/milvus/issues/19095
func (mcm *RemoteChunkManager) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) (err error) {
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataWalkLabel, metrics.TotalLabel).Inc()
logger := log.With(zap.String("prefix", prefix), zap.Bool("recursive", recursive))
// TODO add concurrent call if performance matters
// only return current level per call
return mcm.listObjects(ctx, mcm.bucketName, prefix, recursive)
logger.Info("start walk through objects")
if err := mcm.client.WalkWithObjects(ctx, mcm.bucketName, prefix, recursive, walkFunc); err != nil {
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataWalkLabel, metrics.FailLabel).Inc()
logger.Warn("failed to walk through objects", zap.Error(err))
return err
}
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataWalkLabel, metrics.SuccessLabel).Inc()
logger.Info("finish walk through objects")
return nil
}
func (mcm *RemoteChunkManager) getObject(ctx context.Context, bucketName, objectName string,
@ -367,22 +371,6 @@ func (mcm *RemoteChunkManager) getObjectSize(ctx context.Context, bucketName, ob
return info, err
}
func (mcm *RemoteChunkManager) listObjects(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []time.Time, error) {
start := timerecord.NewTimeRecorder("listObjects")
blobNames, lastModifiedTime, err := mcm.client.ListObjects(ctx, bucketName, prefix, recursive)
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.TotalLabel).Inc()
if err == nil {
metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataListLabel).
Observe(float64(start.ElapseSpan().Milliseconds()))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.SuccessLabel).Inc()
} else {
log.Warn("failed to list with prefix", zap.String("bucket", mcm.bucketName), zap.String("prefix", prefix), zap.Error(err))
metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.FailLabel).Inc()
}
return blobNames, lastModifiedTime, err
}
func (mcm *RemoteChunkManager) removeObject(ctx context.Context, bucketName, objectName string) error {
start := timerecord.NewTimeRecorder("removeObject")
@ -421,3 +409,21 @@ func checkObjectStorageError(fileName string, err error) error {
}
return merr.WrapErrIoFailed(fileName, err)
}
// Learn from file.ReadFile
func read(r io.Reader, size int64) ([]byte, error) {
data := make([]byte, 0, size)
for {
n, err := r.Read(data[len(data):cap(data)])
data = data[:len(data)+n]
if err != nil {
if err == io.EOF {
err = nil
}
return data, err
}
if len(data) == cap(data) {
return data, nil
}
}
}

View File

@ -149,7 +149,7 @@ func TestMinioChunkManager(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, gotv, err := testCM.ReadWithPrefix(ctx, path.Join(testLoadRoot, test.prefix))
gotk, gotv, err := readAllChunkWithPrefix(ctx, testCM, path.Join(testLoadRoot, test.prefix))
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
assert.Equal(t, len(test.expectedValue), len(gotv))
@ -455,7 +455,7 @@ func TestMinioChunkManager(t *testing.T) {
assert.NoError(t, err)
pathPrefix := path.Join(testPrefix, "a")
r, m, err := testCM.ListWithPrefix(ctx, pathPrefix, true)
r, m, err := ListAllChunkWithPrefix(ctx, testCM, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, len(r), 2)
assert.Equal(t, len(m), 2)
@ -471,18 +471,18 @@ func TestMinioChunkManager(t *testing.T) {
key = path.Join(testPrefix, "bc", "a", "b")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
dirs, mods, err := testCM.ListWithPrefix(ctx, testPrefix+"/", true)
dirs, mods, err := ListAllChunkWithPrefix(ctx, testCM, testPrefix+"/", true)
assert.NoError(t, err)
assert.Equal(t, 5, len(dirs))
assert.Equal(t, 5, len(mods))
dirs, mods, err = testCM.ListWithPrefix(ctx, path.Join(testPrefix, "b"), true)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, path.Join(testPrefix, "b"), true)
assert.NoError(t, err)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
testCM.RemoveWithPrefix(ctx, testPrefix)
r, m, err = testCM.ListWithPrefix(ctx, pathPrefix, true)
r, m, err = ListAllChunkWithPrefix(ctx, testCM, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(r))
assert.Equal(t, 0, len(m))
@ -490,7 +490,7 @@ func TestMinioChunkManager(t *testing.T) {
// test wrong prefix
b := make([]byte, 2048)
pathWrong := path.Join(testPrefix, string(b))
_, _, err = testCM.ListWithPrefix(ctx, pathWrong, true)
_, _, err = ListAllChunkWithPrefix(ctx, testCM, pathWrong, true)
assert.Error(t, err)
})
@ -603,7 +603,7 @@ func TestAzureChunkManager(t *testing.T) {
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, gotv, err := testCM.ReadWithPrefix(ctx, path.Join(testLoadRoot, test.prefix))
gotk, gotv, err := readAllChunkWithPrefix(ctx, testCM, path.Join(testLoadRoot, test.prefix))
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
assert.Equal(t, len(test.expectedValue), len(gotv))
@ -909,7 +909,7 @@ func TestAzureChunkManager(t *testing.T) {
assert.NoError(t, err)
pathPrefix := path.Join(testPrefix, "a")
r, m, err := testCM.ListWithPrefix(ctx, pathPrefix, true)
r, m, err := ListAllChunkWithPrefix(ctx, testCM, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, len(r), 2)
assert.Equal(t, len(m), 2)
@ -925,18 +925,18 @@ func TestAzureChunkManager(t *testing.T) {
key = path.Join(testPrefix, "bc", "a", "b")
err = testCM.Write(ctx, key, value)
assert.NoError(t, err)
dirs, mods, err := testCM.ListWithPrefix(ctx, testPrefix+"/", true)
dirs, mods, err := ListAllChunkWithPrefix(ctx, testCM, testPrefix+"/", true)
assert.NoError(t, err)
assert.Equal(t, 5, len(dirs))
assert.Equal(t, 5, len(mods))
dirs, mods, err = testCM.ListWithPrefix(ctx, path.Join(testPrefix, "b"), true)
dirs, mods, err = ListAllChunkWithPrefix(ctx, testCM, path.Join(testPrefix, "b"), true)
assert.NoError(t, err)
assert.Equal(t, 3, len(dirs))
assert.Equal(t, 3, len(mods))
testCM.RemoveWithPrefix(ctx, testPrefix)
r, m, err = testCM.ListWithPrefix(ctx, pathPrefix, true)
r, m, err = ListAllChunkWithPrefix(ctx, testCM, pathPrefix, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(r))
assert.Equal(t, 0, len(m))
@ -944,7 +944,7 @@ func TestAzureChunkManager(t *testing.T) {
// test wrong prefix
b := make([]byte, 2048)
pathWrong := path.Join(testPrefix, string(b))
_, _, err = testCM.ListWithPrefix(ctx, pathWrong, true)
_, _, err = ListAllChunkWithPrefix(ctx, testCM, pathWrong, true)
assert.Error(t, err)
})

View File

@ -41,6 +41,12 @@ type FileReader interface {
io.Seeker
}
// ChunkObjectInfo is to store object info.
type ChunkObjectInfo struct {
FilePath string
ModifyTime time.Time
}
// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {
@ -62,9 +68,10 @@ type ChunkManager interface {
Reader(ctx context.Context, filePath string) (FileReader, error)
// MultiRead reads @filePath and returns content.
MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)
ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)
// ReadWithPrefix reads files with same @prefix and returns contents.
ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)
// WalkWithPrefix list files with same @prefix and call @walkFunc for each file.
// 1. walkFunc return false or reach the last object, WalkWithPrefix will stop and return nil.
// 2. underlying walking failed or context canceled, WalkWithPrefix will stop and return a error.
WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc) error
Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)
// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.
// if all bytes are read, @err is io.EOF.
@ -77,3 +84,18 @@ type ChunkManager interface {
// RemoveWithPrefix remove files with same @prefix.
RemoveWithPrefix(ctx context.Context, prefix string) error
}
// ListAllChunkWithPrefix is a helper function to list all objects with same @prefix by using `ListWithPrefix`.
// `ListWithPrefix` is more efficient way to call if you don't need all chunk at same time.
func ListAllChunkWithPrefix(ctx context.Context, manager ChunkManager, prefix string, recursive bool) ([]string, []time.Time, error) {
var dirs []string
var mods []time.Time
if err := manager.WalkWithPrefix(ctx, prefix, recursive, func(chunkInfo *ChunkObjectInfo) bool {
dirs = append(dirs, chunkInfo.FilePath)
mods = append(mods, chunkInfo.ModifyTime)
return true
}); err != nil {
return nil, nil, err
}
return dirs, mods, nil
}

View File

@ -90,7 +90,7 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error {
if len(paths) < 2 {
return nil
}
deltaLogs, _, err := r.cm.ListWithPrefix(context.Background(), paths[1], true)
deltaLogs, _, err := storage.ListAllChunkWithPrefix(context.Background(), r.cm, paths[1], true)
if err != nil {
return err
}

View File

@ -24,6 +24,7 @@ import (
"math/rand"
"strconv"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
@ -345,8 +346,24 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
originalInsertData := createInsertData(suite.T(), schema, suite.numRows)
insertLogs := lo.Flatten(lo.Values(insertBinlogs))
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(insertLogs, nil, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(deltaLogs, nil, nil)
cm.EXPECT().WalkWithPrefix(mock.Anything, insertPrefix, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, s string, b bool, cowf storage.ChunkObjectWalkFunc) error {
for _, filePath := range insertLogs {
if !cowf(&storage.ChunkObjectInfo{FilePath: filePath, ModifyTime: time.Now()}) {
return nil
}
}
return nil
})
cm.EXPECT().WalkWithPrefix(mock.Anything, deltaPrefix, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, s string, b bool, cowf storage.ChunkObjectWalkFunc) error {
for _, filePath := range deltaLogs {
if !cowf(&storage.ChunkObjectInfo{FilePath: filePath, ModifyTime: time.Now()}) {
return nil
}
}
return nil
})
for fieldID, paths := range insertBinlogs {
field := typeutil.GetField(schema, fieldID)
suite.NotNil(field)

View File

@ -66,20 +66,25 @@ func newBinlogReader(ctx context.Context, cm storage.ChunkManager, path string)
}
func listInsertLogs(ctx context.Context, cm storage.ChunkManager, insertPrefix string) (map[int64][]string, error) {
insertLogPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, true)
if err != nil {
return nil, err
}
insertLogs := make(map[int64][]string)
for _, logPath := range insertLogPaths {
fieldPath := path.Dir(logPath)
var walkErr error
if err := cm.WalkWithPrefix(ctx, insertPrefix, true, func(insertLog *storage.ChunkObjectInfo) bool {
fieldPath := path.Dir(insertLog.FilePath)
fieldStrID := path.Base(fieldPath)
fieldID, err := strconv.ParseInt(fieldStrID, 10, 64)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to parse field id from log, error: %v", err))
walkErr = merr.WrapErrImportFailed(fmt.Sprintf("failed to parse field id from log, error: %v", err))
return false
}
insertLogs[fieldID] = append(insertLogs[fieldID], logPath)
insertLogs[fieldID] = append(insertLogs[fieldID], insertLog.FilePath)
return true
}); err != nil {
return nil, err
}
if walkErr != nil {
return nil, walkErr
}
for _, v := range insertLogs {
sort.Strings(v)
}

View File

@ -192,13 +192,13 @@ var (
/* garbage collector related metrics */
// GarbageCollectorListLatency metrics for gc scan storage files.
GarbageCollectorListLatency = prometheus.NewHistogramVec(
// GarbageCollectorFileScanDuration metrics for gc scan storage files.
GarbageCollectorFileScanDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "gc_list_latency",
Help: "latency of list objects in storage while garbage collecting (in milliseconds)",
Name: "gc_file_scan_duration",
Help: "duration of scan file in storage while garbage collecting (in milliseconds)",
Buckets: longTaskBuckets,
}, []string{nodeIDLabelName, segmentFileTypeLabelName})
@ -305,6 +305,8 @@ func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(IndexTaskNum)
registry.MustRegister(IndexNodeNum)
registry.MustRegister(ImportTasks)
registry.MustRegister(GarbageCollectorFileScanDuration)
registry.MustRegister(GarbageCollectorRunCount)
}
func CleanupDataCoordSegmentMetrics(collectionID int64, segmentID int64) {

View File

@ -22,7 +22,7 @@ const (
DataGetLabel = "get"
DataPutLabel = "put"
DataRemoveLabel = "remove"
DataListLabel = "list"
DataWalkLabel = "walk"
DataStatLabel = "stat"
persistentDataOpType = "persistent_data_op_type"

View File

@ -112,3 +112,15 @@ func AwaitAll[T future](futures ...T) error {
return nil
}
// BlockOnAll blocks until all futures complete.
// Return the first error in these futures.
func BlockOnAll[T future](futures ...T) error {
var err error
for i := range futures {
if e := futures[i].Err(); e != nil && err == nil {
err = e
}
}
return err
}

View File

@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
)
type FutureSuite struct {
@ -46,6 +47,54 @@ func (s *FutureSuite) TestFuture() {
s.Equal(10, resultFuture.Value())
}
func (s *FutureSuite) TestBlockOnAll() {
cnt := atomic.NewInt32(0)
futures := make([]*Future[struct{}], 10)
for i := 0; i < 10; i++ {
sleepTime := time.Duration(i) * 100 * time.Millisecond
futures[i] = Go(func() (struct{}, error) {
time.Sleep(sleepTime)
cnt.Add(1)
return struct{}{}, errors.New("errFuture")
})
}
err := BlockOnAll(futures...)
s.Error(err)
s.Equal(int32(10), cnt.Load())
cnt.Store(0)
for i := 0; i < 10; i++ {
sleepTime := time.Duration(i) * 100 * time.Millisecond
futures[i] = Go(func() (struct{}, error) {
time.Sleep(sleepTime)
cnt.Add(1)
return struct{}{}, nil
})
}
err = BlockOnAll(futures...)
s.NoError(err)
s.Equal(int32(10), cnt.Load())
}
func (s *FutureSuite) TestAwaitAll() {
cnt := atomic.NewInt32(0)
futures := make([]*Future[struct{}], 10)
for i := 0; i < 10; i++ {
sleepTime := time.Duration(i) * 100 * time.Millisecond
futures[i] = Go(func() (struct{}, error) {
time.Sleep(sleepTime)
cnt.Add(1)
return struct{}{}, errors.New("errFuture")
})
}
err := AwaitAll(futures...)
s.Error(err)
s.Equal(int32(1), cnt.Load())
}
func TestFuture(t *testing.T) {
suite.Run(t, new(FutureSuite))
}

View File

@ -2994,8 +2994,8 @@ During compaction, the size of segment # of rows is able to exceed segment max #
p.GCMissingTolerance = ParamItem{
Key: "dataCoord.gc.missingTolerance",
Version: "2.0.0",
DefaultValue: "3600",
Doc: "file meta missing tolerance duration in seconds, default to 1hr",
DefaultValue: "86400",
Doc: "file meta missing tolerance duration in seconds, default to 24hr(1d)",
Export: true,
}
p.GCMissingTolerance.Init(base.mgr)

View File

@ -1032,21 +1032,22 @@ func (r *NatsmqConfig) Init(base *BaseTable) {
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
type MinioConfig struct {
Address ParamItem `refreshable:"false"`
Port ParamItem `refreshable:"false"`
AccessKeyID ParamItem `refreshable:"false"`
SecretAccessKey ParamItem `refreshable:"false"`
UseSSL ParamItem `refreshable:"false"`
SslCACert ParamItem `refreshable:"false"`
BucketName ParamItem `refreshable:"false"`
RootPath ParamItem `refreshable:"false"`
UseIAM ParamItem `refreshable:"false"`
CloudProvider ParamItem `refreshable:"false"`
IAMEndpoint ParamItem `refreshable:"false"`
LogLevel ParamItem `refreshable:"false"`
Region ParamItem `refreshable:"false"`
UseVirtualHost ParamItem `refreshable:"false"`
RequestTimeoutMs ParamItem `refreshable:"false"`
Address ParamItem `refreshable:"false"`
Port ParamItem `refreshable:"false"`
AccessKeyID ParamItem `refreshable:"false"`
SecretAccessKey ParamItem `refreshable:"false"`
UseSSL ParamItem `refreshable:"false"`
SslCACert ParamItem `refreshable:"false"`
BucketName ParamItem `refreshable:"false"`
RootPath ParamItem `refreshable:"false"`
UseIAM ParamItem `refreshable:"false"`
CloudProvider ParamItem `refreshable:"false"`
IAMEndpoint ParamItem `refreshable:"false"`
LogLevel ParamItem `refreshable:"false"`
Region ParamItem `refreshable:"false"`
UseVirtualHost ParamItem `refreshable:"false"`
RequestTimeoutMs ParamItem `refreshable:"false"`
ListObjectsMaxKeys ParamItem `refreshable:"true"`
}
func (p *MinioConfig) Init(base *BaseTable) {
@ -1214,4 +1215,14 @@ Leave it empty if you want to use AWS default endpoint`,
Export: true,
}
p.RequestTimeoutMs.Init(base.mgr)
p.ListObjectsMaxKeys = ParamItem{
Key: "minio.listObjectsMaxKeys",
Version: "2.4.1",
DefaultValue: "0",
Doc: `The maximum number of objects requested per batch in minio ListObjects rpc,
0 means using oss client by default, decrease these configration if ListObjects timeout`,
Export: true,
}
p.ListObjectsMaxKeys.Init(base.mgr)
}