enhance: Check channel cp lag before generate compaction task (#30997)

See also #30996

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-03-05 13:39:01 +08:00 committed by GitHub
parent 0dacf0172e
commit 1936aa4caa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 216 additions and 20 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -203,6 +204,20 @@ func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionIn
return enabled
}
func (t *compactionTrigger) isChannelCheckpointHealthy(vchanName string) bool {
if paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsInt64() <= 0 {
return true
}
checkpoint := t.meta.GetChannelCheckpoint(vchanName)
if checkpoint == nil {
log.Warn("channel checkpoint not found", zap.String("channel", vchanName))
return false
}
cpTime := tsoutil.PhysicalTime(checkpoint.GetTimestamp())
return time.Since(cpTime) < paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second)
}
func (t *compactionTrigger) getCompactTime(ts Timestamp, coll *collectionInfo) (*compactTime, error) {
collectionTTL, err := getCollectionTTL(coll.Properties)
if err != nil {
@ -410,6 +425,15 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
return err
}
channelCheckpointOK := make(map[string]bool)
isChannelCPOK := func(channelName string) bool {
cached, ok := channelCheckpointOK[channelName]
if ok {
return cached
}
return t.isChannelCheckpointHealthy(channelName)
}
for _, group := range m {
log := log.With(zap.Int64("collectionID", group.collectionID),
zap.Int64("partitionID", group.partitionID),
@ -418,6 +442,11 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
log.Warn("compaction plan skipped due to handler full")
break
}
if !isChannelCPOK(group.channelName) && !signal.isForce {
log.Warn("compaction plan skipped due to channel checkpoint lag", zap.String("channel", signal.channel))
continue
}
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
group.segments = FilterInIndexedSegments(t.handler, t.meta, group.segments...)
}
@ -501,6 +530,11 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
return
}
if !t.isChannelCheckpointHealthy(signal.channel) {
log.Warn("compaction plan skipped due to channel checkpoint lag", zap.String("channel", signal.channel))
return
}
segment := t.meta.GetHealthySegment(signal.segmentID)
if segment == nil {
log.Warn("segment in compaction signal not found in meta", zap.Int64("segmentID", signal.segmentID))

View File

@ -30,13 +30,17 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type spyCompactionHandler struct {
@ -89,6 +93,7 @@ func newMockVersionManager() IndexEngineVersionManager {
var _ compactionPlanContext = (*spyCompactionHandler)(nil)
func Test_compactionTrigger_force(t *testing.T) {
paramtable.Init()
type fields struct {
meta *meta
allocator allocator
@ -113,7 +118,9 @@ func Test_compactionTrigger_force(t *testing.T) {
"test force compaction",
fields{
&meta{
catalog: catalog,
catalog: catalog,
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
@ -728,7 +735,9 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
"test many segments",
fields{
&meta{
segments: segmentInfos,
segments: segmentInfos,
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
collections: map[int64]*collectionInfo{
2: {
ID: 2,
@ -870,6 +879,9 @@ func Test_compactionTrigger_noplan(t *testing.T) {
&meta{
indexMeta: newSegmentIndexMeta(nil),
// 4 segment
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
@ -1051,6 +1063,9 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
fields{
&meta{
// 8 small segments
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
@ -1142,6 +1157,10 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.fields.meta.channelCPs.Insert("ch1", &msgpb.MsgPosition{
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
})
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
@ -1231,6 +1250,9 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
fields{
&meta{
// 4 small segments
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
@ -1325,6 +1347,10 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.fields.meta.channelCPs.Insert("ch1", &msgpb.MsgPosition{
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
})
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
@ -1417,6 +1443,9 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
"test small segment",
fields{
&meta{
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
// 4 small segments
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
@ -1507,6 +1536,10 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.fields.meta.channelCPs.Insert("ch1", &msgpb.MsgPosition{
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
})
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
@ -1639,6 +1672,9 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
"test rand size segment",
fields{
&meta{
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
segments: segmentInfos,
collections: map[int64]*collectionInfo{
2: {
@ -1676,6 +1712,10 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.fields.meta.channelCPs.Insert("ch1", &msgpb.MsgPosition{
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
})
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
@ -1728,7 +1768,11 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
// Test shouldDoSingleCompaction
func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
indexMeta := newSegmentIndexMeta(nil)
trigger := newCompactionTrigger(&meta{indexMeta: indexMeta}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler(), newIndexEngineVersionManager())
trigger := newCompactionTrigger(&meta{
indexMeta: indexMeta,
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler(), newIndexEngineVersionManager())
// Test too many deltalogs.
var binlogs []*datapb.FieldBinlog
@ -1983,16 +2027,6 @@ func Test_compactionTrigger_new(t *testing.T) {
}
}
func Test_compactionTrigger_handleSignal(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: NewCompactionScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager())
signal := &compactionSignal{
segmentID: 1,
}
assert.NotPanics(t, func() {
got.handleSignal(signal)
})
}
func Test_compactionTrigger_allocTs(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: NewCompactionScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager())
ts, err := got.allocTs()
@ -2052,7 +2086,11 @@ func Test_triggerSingleCompaction(t *testing.T) {
defer func() {
Params.Save(Params.DataCoordCfg.EnableAutoCompaction.Key, originValue)
}()
m := &meta{segments: NewSegmentsInfo(), collections: make(map[UniqueID]*collectionInfo)}
m := &meta{
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
segments: NewSegmentsInfo(), collections: make(map[UniqueID]*collectionInfo),
}
got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(),
&ServerHandler{
&Server{
@ -2137,6 +2175,7 @@ type CompactionTriggerSuite struct {
}
func (s *CompactionTriggerSuite) SetupSuite() {
paramtable.Init()
}
func (s *CompactionTriggerSuite) genSeg(segID, numRows int64) *datapb.SegmentInfo {
@ -2181,7 +2220,13 @@ func (s *CompactionTriggerSuite) SetupTest() {
s.indexID = 300
s.vecFieldID = 400
s.channel = "dml_0_100v0"
catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, s.channel, mock.Anything).Return(nil)
s.meta = &meta{
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
catalog: catalog,
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
@ -2256,6 +2301,11 @@ func (s *CompactionTriggerSuite) SetupTest() {
},
},
}
s.meta.UpdateChannelCheckpoint(s.channel, &msgpb.MsgPosition{
ChannelName: s.channel,
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
})
s.allocator = NewNMockAllocator(s.T())
s.compactionHandler = NewMockCompactionPlanContext(s.T())
s.handler = NewNMockHandler(s.T())
@ -2375,6 +2425,28 @@ func (s *CompactionTriggerSuite) TestHandleSignal() {
isForce: true,
})
})
s.Run("channel_cp_lag_too_large", func() {
defer s.SetupTest()
ptKey := paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.Key
paramtable.Get().Save(ptKey, "900")
defer paramtable.Get().Reset(ptKey)
s.compactionHandler.EXPECT().isFull().Return(false)
s.meta.channelCPs.Insert(s.channel, &msgpb.MsgPosition{
ChannelName: s.channel,
Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(time.Second*-901), 0),
MsgID: []byte{1, 2, 3, 4},
})
s.tr.handleSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
isForce: false,
})
})
}
func (s *CompactionTriggerSuite) TestHandleGlobalSignal() {
@ -2484,6 +2556,81 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() {
isForce: true,
})
})
s.Run("channel_cp_lag_too_large", func() {
defer s.SetupTest()
ptKey := paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.Key
paramtable.Get().Save(ptKey, "900")
defer paramtable.Get().Reset(ptKey)
s.compactionHandler.EXPECT().isFull().Return(false)
s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil)
s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil)
s.meta.channelCPs.Insert(s.channel, &msgpb.MsgPosition{
ChannelName: s.channel,
Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(time.Second*-901), 0),
MsgID: []byte{1, 2, 3, 4},
})
tr := s.tr
tr.handleGlobalSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
isForce: false,
})
})
}
func (s *CompactionTriggerSuite) TestIsChannelCheckpointHealthy() {
ptKey := paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.Key
s.Run("ok", func() {
paramtable.Get().Save(ptKey, "900")
defer paramtable.Get().Reset(ptKey)
s.meta.channelCPs.Insert(s.channel, &msgpb.MsgPosition{
ChannelName: s.channel,
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
})
result := s.tr.isChannelCheckpointHealthy(s.channel)
s.True(result, "ok case, check shall return true")
})
s.Run("cp_healthzcheck_disabled", func() {
paramtable.Get().Save(ptKey, "0")
defer paramtable.Get().Reset(ptKey)
result := s.tr.isChannelCheckpointHealthy(s.channel)
s.True(result, "channel cp always healthy when config disable this check")
})
s.Run("checkpoint_not_exist", func() {
paramtable.Get().Save(ptKey, "900")
defer paramtable.Get().Reset(ptKey)
s.meta.channelCPs.Remove(s.channel)
result := s.tr.isChannelCheckpointHealthy(s.channel)
s.False(result, "check shall fail when checkpoint not exist in meta")
})
s.Run("checkpoint_lag", func() {
paramtable.Get().Save(ptKey, "900")
defer paramtable.Get().Reset(ptKey)
s.meta.channelCPs.Insert(s.channel, &msgpb.MsgPosition{
ChannelName: s.channel,
Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(time.Second*-901), 0),
MsgID: []byte{1, 2, 3, 4},
})
result := s.tr.isChannelCheckpointHealthy(s.channel)
s.False(result, "check shall fail when checkpoint lag larger than config")
})
}
// test updateSegmentMaxSize
@ -2559,8 +2706,10 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) {
"all mem index",
fields{
&meta{
catalog: catalog,
segments: segmentsInfo,
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
catalog: catalog,
segments: segmentsInfo,
collections: map[int64]*collectionInfo{
collectionID: info,
},
@ -2622,8 +2771,10 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) {
"all disk index",
fields{
&meta{
catalog: catalog,
segments: segmentsInfo,
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
catalog: catalog,
segments: segmentsInfo,
collections: map[int64]*collectionInfo{
collectionID: info,
},
@ -2685,8 +2836,10 @@ func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) {
"some mme index",
fields{
&meta{
catalog: catalog,
segments: segmentsInfo,
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
catalog: catalog,
segments: segmentsInfo,
collections: map[int64]*collectionInfo{
collectionID: info,
},

View File

@ -2512,6 +2512,7 @@ type dataCoordConfig struct {
SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
// LevelZero Segment
EnableLevelZeroSegment ParamItem `refreshable:"false"`
@ -2823,6 +2824,14 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.GlobalCompactionInterval.Init(base.mgr)
p.ChannelCheckpointMaxLag = ParamItem{
Key: "dataCoord.compaction.channelMaxCPLag",
Version: "2.4.0",
Doc: "max tolerable channel checkpoint lag(in seconds) to execute compaction",
DefaultValue: "900", // 15 * 60 seconds
}
p.ChannelCheckpointMaxLag.Init(base.mgr)
// LevelZeroCompaction
p.EnableLevelZeroSegment = ParamItem{
Key: "dataCoord.segment.enableLevelZero",