mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 11:29:48 +08:00
issue: #30633 pr: #34445 Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
parent
0d7ba810b3
commit
56a74e72f7
@ -75,12 +75,12 @@ func (t *clusteringCompactionTask) Process() bool {
|
||||
currentState := t.State.String()
|
||||
if currentState != lastState {
|
||||
ts := time.Now().UnixMilli()
|
||||
t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts))
|
||||
lastStateDuration := ts - t.GetLastStateStartTime()
|
||||
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration))
|
||||
metrics.DataCoordCompactionLatency.
|
||||
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), datapb.CompactionType_ClusteringCompaction.String(), lastState).
|
||||
Observe(float64(lastStateDuration))
|
||||
t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts))
|
||||
|
||||
if t.State == datapb.CompactionTaskState_completed {
|
||||
t.updateAndSaveTaskMeta(setEndTime(ts))
|
||||
@ -448,6 +448,7 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap
|
||||
PreferSegmentRows: t.GetPreferSegmentRows(),
|
||||
AnalyzeTaskID: t.GetAnalyzeTaskID(),
|
||||
AnalyzeVersion: t.GetAnalyzeVersion(),
|
||||
LastStateStartTime: t.GetLastStateStartTime(),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(taskClone)
|
||||
|
@ -172,11 +172,6 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection
|
||||
|
||||
func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) {
|
||||
for _, view := range views {
|
||||
if m.compactionHandler.isFull() {
|
||||
log.RatedInfo(10, "Skip trigger compaction for scheduler is full")
|
||||
return
|
||||
}
|
||||
|
||||
switch eventType {
|
||||
case TriggerTypeLevelZeroViewChange:
|
||||
log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange")
|
||||
|
@ -48,32 +48,6 @@ func (s *CompactionTriggerManagerSuite) SetupTest() {
|
||||
s.triggerManager = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.mockPlanContext, s.meta)
|
||||
}
|
||||
|
||||
func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() {
|
||||
s.mockPlanContext.EXPECT().isFull().Return(true)
|
||||
collSegs := s.meta.GetCompactableSegmentGroupByCollection()
|
||||
segments, found := collSegs[1]
|
||||
s.Require().True(found)
|
||||
|
||||
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
|
||||
return info.GetLevel() == datapb.SegmentLevel_L0
|
||||
})
|
||||
|
||||
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
|
||||
s.Require().NotEmpty(latestL0Segments)
|
||||
needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
s.Require().True(needRefresh)
|
||||
s.Require().Equal(1, len(levelZeroView))
|
||||
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
|
||||
s.True(ok)
|
||||
s.NotNil(cView)
|
||||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe()
|
||||
s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView)
|
||||
}
|
||||
|
||||
func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
||||
handler := NewNMockHandler(s.T())
|
||||
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil)
|
||||
@ -104,7 +78,6 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
||||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything).
|
||||
RunAndReturn(func(task *datapb.CompactionTask) error {
|
||||
s.EqualValues(19530, task.GetTriggerID())
|
||||
@ -149,7 +122,6 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
|
||||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything).
|
||||
RunAndReturn(func(task *datapb.CompactionTask) error {
|
||||
s.EqualValues(19530, task.GetTriggerID())
|
||||
|
Loading…
Reference in New Issue
Block a user