enhance: Add concurrency for datacoord segment GC (#29561)

issue: https://github.com/milvus-io/milvus/issues/29553
/kind improvement

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-01-03 13:16:57 +08:00 committed by GitHub
parent b12c90af72
commit d23f87a393
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 180 additions and 52 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -49,6 +50,8 @@ type GcOption struct {
checkInterval time.Duration // each interval
missingTolerance time.Duration // key missing in meta tolerance time
dropTolerance time.Duration // dropped segment related key tolerance time
removeLogPool *conc.Pool[struct{}]
}
// garbageCollector handles garbage files in object storage
@ -75,6 +78,7 @@ type gcCmd struct {
func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector {
log.Info("GC with option", zap.Bool("enabled", opt.enabled), zap.Duration("interval", opt.checkInterval),
zap.Duration("missingTolerance", opt.missingTolerance), zap.Duration("dropTolerance", opt.dropTolerance))
opt.removeLogPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
return &garbageCollector{
meta: meta,
handler: handler,
@ -353,6 +357,7 @@ func (gc *garbageCollector) clearEtcd() {
return dropIDs[i] < dropIDs[j]
})
log.Info("start to GC segments", zap.Int("drop_num", len(dropIDs)))
for _, segmentID := range dropIDs {
segment, ok := drops[segmentID]
if !ok {
@ -366,7 +371,10 @@ func (gc *garbageCollector) clearEtcd() {
}
logs := getLogs(segment)
log.Info("GC segment", zap.Int64("segmentID", segment.GetID()))
log.Info("GC segment", zap.Int64("segmentID", segment.GetID()),
zap.Int("insert_logs", len(segment.GetBinlogs())),
zap.Int("delta_logs", len(segment.GetDeltalogs())),
zap.Int("stats_logs", len(segment.GetStatslogs())))
if gc.removeLogs(logs) {
err := gc.meta.DropSegment(segment.GetID())
if err != nil {
@ -409,22 +417,39 @@ func getLogs(sinfo *SegmentInfo) []*datapb.Binlog {
func (gc *garbageCollector) removeLogs(logs []*datapb.Binlog) bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
delFlag := true
var w sync.WaitGroup
w.Add(len(logs))
for _, l := range logs {
err := gc.option.cli.Remove(ctx, l.GetLogPath())
if err != nil {
switch err.(type) {
case minio.ErrorResponse:
errResp := minio.ToErrorResponse(err)
if errResp.Code != "" && errResp.Code != "NoSuchKey" {
delFlag = false
}
tmpLog := l
gc.option.removeLogPool.Submit(func() (struct{}, error) {
defer w.Done()
select {
case <-ctx.Done():
return struct{}{}, nil
default:
delFlag = false
err := gc.option.cli.Remove(ctx, tmpLog.GetLogPath())
if err != nil {
switch err.(type) {
case minio.ErrorResponse:
errResp := minio.ToErrorResponse(err)
if errResp.Code != "" && errResp.Code != "NoSuchKey" {
cancel()
}
default:
cancel()
}
}
return struct{}{}, nil
}
}
})
}
w.Wait()
select {
case <-ctx.Done():
return false
default:
return true
}
return delFlag
}
func (gc *garbageCollector) recycleUnusedIndexes() {

View File

@ -404,9 +404,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(nil)
gc := &garbageCollector{
meta: createMetaForRecycleUnusedIndexes(catalog),
}
gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{})
gc.recycleUnusedIndexes()
})
@ -417,9 +415,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
gc := &garbageCollector{
meta: createMetaForRecycleUnusedIndexes(catalog),
}
gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{})
gc.recycleUnusedIndexes()
})
}
@ -545,9 +541,7 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(nil)
gc := &garbageCollector{
meta: createMetaForRecycleUnusedSegIndexes(catalog),
}
gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{})
gc.recycleUnusedSegIndexes()
})
@ -560,9 +554,7 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(errors.New("fail"))
gc := &garbageCollector{
meta: createMetaForRecycleUnusedSegIndexes(catalog),
}
gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{})
gc.recycleUnusedSegIndexes()
})
}
@ -707,12 +699,13 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil)
cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(nil)
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
gc := &garbageCollector{
meta: createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
option: GcOption{
gc := newGarbageCollector(
createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
nil,
GcOption{
cli: cm,
},
}
})
gc.recycleUnusedIndexFiles()
})
@ -720,12 +713,12 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
cm := &mocks.ChunkManager{}
cm.EXPECT().RootPath().Return("root")
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, errors.New("error"))
gc := &garbageCollector{
meta: createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
option: GcOption{
gc := newGarbageCollector(
createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
nil,
GcOption{
cli: cm,
},
}
})
gc.recycleUnusedIndexFiles()
})
@ -735,12 +728,12 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("error"))
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil)
cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(nil)
gc := &garbageCollector{
meta: createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
option: GcOption{
gc := newGarbageCollector(
createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
nil,
GcOption{
cli: cm,
},
}
})
gc.recycleUnusedIndexFiles()
})
@ -750,12 +743,12 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("error"))
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil)
cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("error"))
gc := &garbageCollector{
meta: createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
option: GcOption{
gc := newGarbageCollector(
createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}),
nil,
GcOption{
cli: cm,
},
}
})
gc.recycleUnusedIndexFiles()
})
}
@ -804,6 +797,57 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "log1",
LogSize: 1024,
},
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
LogPath: "log2",
LogSize: 1024,
},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "del_log1",
LogSize: 1024,
},
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
LogPath: "del_log2",
LogSize: 1024,
},
},
},
},
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "stats_log1",
LogSize: 1024,
},
},
},
},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
@ -1045,14 +1089,13 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
}
cm := &mocks.ChunkManager{}
cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
gc := &garbageCollector{
option: GcOption{
cli: &mocks.ChunkManager{},
gc := newGarbageCollector(
m,
newMockHandlerWithMeta(m),
GcOption{
cli: cm,
dropTolerance: 1,
},
meta: m,
handler: newMockHandlerWithMeta(m),
}
})
gc.clearEtcd()
/*
@ -1136,6 +1179,56 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
assert.Nil(t, segB)
}
func TestGarbageCollector_removelogs(t *testing.T) {
paramtable.Init()
cm := &mocks.ChunkManager{}
gc := newGarbageCollector(
nil,
nil,
GcOption{
cli: cm,
dropTolerance: 1,
})
var logs []*datapb.Binlog
for i := 0; i < 50; i++ {
logs = append(logs, &datapb.Binlog{
LogPath: "log" + strconv.Itoa(i),
})
}
t.Run("success", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil)
defer call.Unset()
b := gc.removeLogs(logs)
assert.True(t, b)
})
t.Run("minio not found error", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(minio.ErrorResponse{
Code: "NoSuchKey",
})
defer call.Unset()
b := gc.removeLogs(logs)
assert.True(t, b)
})
t.Run("minio server error", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(minio.ErrorResponse{
Code: "Server Error",
})
defer call.Unset()
b := gc.removeLogs(logs)
assert.False(t, b)
})
t.Run("other type error", func(t *testing.T) {
call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("other error"))
defer call.Unset()
b := gc.removeLogs(logs)
assert.False(t, b)
})
}
type GarbageCollectorSuite struct {
suite.Suite

View File

@ -2238,6 +2238,7 @@ type dataCoordConfig struct {
GCInterval ParamItem `refreshable:"false"`
GCMissingTolerance ParamItem `refreshable:"false"`
GCDropTolerance ParamItem `refreshable:"false"`
GCRemoveConcurrent ParamItem `refreshable:"false"`
EnableActiveStandby ParamItem `refreshable:"false"`
BindIndexNodeMode ParamItem `refreshable:"false"`
@ -2587,6 +2588,15 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.GCDropTolerance.Init(base.mgr)
p.GCRemoveConcurrent = ParamItem{
Key: "dataCoord.gc.removeConcurrent",
Version: "2.3.4",
DefaultValue: "32",
Doc: "number of concurrent goroutines to remove dropped s3 objects",
Export: true,
}
p.GCRemoveConcurrent.Init(base.mgr)
p.EnableActiveStandby = ParamItem{
Key: "dataCoord.enableActiveStandby",
Version: "2.0.0",