diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 3212bb7d8a..02a22b8751 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -425,6 +425,8 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() { zap.Int64("buildID", buildID), zap.String("prefix", key), zap.Error(err)) continue } + log.Info("garbageCollector recycleUnusedIndexFiles remove index files success", + zap.Int64("buildID", buildID), zap.String("prefix", key)) continue } filesMap := make(map[string]struct{}) diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 5de06dafc5..81dd24b7de 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -26,16 +26,16 @@ import ( "time" "github.com/cockroachdb/errors" - minio "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - "go.uber.org/zap" - "golang.org/x/exp/mmap" - "github.com/milvus-io/milvus/internal/storage/aliyun" "github.com/milvus-io/milvus/internal/storage/gcp" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/retry" + minio "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "go.uber.org/zap" + "golang.org/x/exp/mmap" + "golang.org/x/sync/errgroup" ) var ( @@ -368,10 +368,31 @@ func (mcm *MinioChunkManager) MultiRemove(ctx context.Context, keys []string) er // RemoveWithPrefix removes all objects with the same prefix @prefix from minio. func (mcm *MinioChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error { objects := mcm.Client.ListObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) - for rErr := range mcm.Client.RemoveObjects(ctx, mcm.bucketName, objects, minio.RemoveObjectsOptions{GovernanceBypass: false}) { - if rErr.Err != nil { - log.Warn("failed to remove objects", zap.String("bucket", mcm.bucketName), zap.String("prefix", prefix), zap.Error(rErr.Err)) - return rErr.Err + i := 0 + maxGoroutine := 10 + removeKeys := make([]string, 0, len(objects)) + for object := range objects { + if object.Err != nil { + return object.Err + } + removeKeys = append(removeKeys, object.Key) + } + for i < len(removeKeys) { + runningGroup, groupCtx := errgroup.WithContext(ctx) + for j := 0; j < maxGoroutine && i < len(removeKeys); j++ { + key := removeKeys[i] + runningGroup.Go(func() error { + err := mcm.Client.RemoveObject(groupCtx, mcm.bucketName, key, minio.RemoveObjectOptions{}) + if err != nil { + log.Warn("failed to remove object", zap.String("path", key), zap.Error(err)) + return err + } + return nil + }) + i++ + } + if err := runningGroup.Wait(); err != nil { + return err } } return nil