mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
fix:[cherry-pick] minor fixs for major compaction (#34402)
This PR cherry-picks the following commits: - fix: Avoid datarace in clustering compaction #34288 - fix: remove isFull check in compaction.enqueue #34338 issue: #30633 pr: #34288 #34338 --------- Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
parent
a1a0a56f86
commit
b3aec4c8e1
@ -535,9 +535,6 @@ func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask {
|
|||||||
|
|
||||||
func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) error {
|
func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) error {
|
||||||
log := log.With(zap.Int64("planID", task.GetPlanID()), zap.Int64("triggerID", task.GetTriggerID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String()))
|
log := log.With(zap.Int64("planID", task.GetPlanID()), zap.Int64("triggerID", task.GetTriggerID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String()))
|
||||||
if c.isFull() {
|
|
||||||
return errCompactionBusy
|
|
||||||
}
|
|
||||||
t, err := c.createCompactTask(task)
|
t, err := c.createCompactTask(task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -571,12 +568,11 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
|
|||||||
}
|
}
|
||||||
case datapb.CompactionType_ClusteringCompaction:
|
case datapb.CompactionType_ClusteringCompaction:
|
||||||
task = &clusteringCompactionTask{
|
task = &clusteringCompactionTask{
|
||||||
CompactionTask: t,
|
CompactionTask: t,
|
||||||
meta: c.meta,
|
meta: c.meta,
|
||||||
sessions: c.sessions,
|
sessions: c.sessions,
|
||||||
handler: c.handler,
|
handler: c.handler,
|
||||||
analyzeScheduler: c.analyzeScheduler,
|
analyzeScheduler: c.analyzeScheduler,
|
||||||
lastUpdateStateTime: time.Now().UnixMilli(),
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
|
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
|
||||||
|
@ -100,3 +100,15 @@ func setStartTime(startTime int64) compactionTaskOpt {
|
|||||||
task.StartTime = startTime
|
task.StartTime = startTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setRetryTimes(retryTimes int32) compactionTaskOpt {
|
||||||
|
return func(task *datapb.CompactionTask) {
|
||||||
|
task.RetryTimes = retryTimes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setLastStateStartTime(lastStateStartTime int64) compactionTaskOpt {
|
||||||
|
return func(task *datapb.CompactionTask) {
|
||||||
|
task.LastStateStartTime = lastStateStartTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -47,10 +47,9 @@ const (
|
|||||||
|
|
||||||
type clusteringCompactionTask struct {
|
type clusteringCompactionTask struct {
|
||||||
*datapb.CompactionTask
|
*datapb.CompactionTask
|
||||||
plan *datapb.CompactionPlan
|
plan *datapb.CompactionPlan
|
||||||
result *datapb.CompactionPlanResult
|
result *datapb.CompactionPlanResult
|
||||||
span trace.Span
|
span trace.Span
|
||||||
lastUpdateStateTime int64
|
|
||||||
|
|
||||||
meta CompactionMeta
|
meta CompactionMeta
|
||||||
sessions SessionManager
|
sessions SessionManager
|
||||||
@ -66,24 +65,22 @@ func (t *clusteringCompactionTask) Process() bool {
|
|||||||
log.Warn("fail in process task", zap.Error(err))
|
log.Warn("fail in process task", zap.Error(err))
|
||||||
if merr.IsRetryableErr(err) && t.RetryTimes < taskMaxRetryTimes {
|
if merr.IsRetryableErr(err) && t.RetryTimes < taskMaxRetryTimes {
|
||||||
// retry in next Process
|
// retry in next Process
|
||||||
t.RetryTimes = t.RetryTimes + 1
|
t.updateAndSaveTaskMeta(setRetryTimes(t.RetryTimes + 1))
|
||||||
} else {
|
} else {
|
||||||
log.Error("task fail with unretryable reason or meet max retry times", zap.Error(err))
|
log.Error("task fail with unretryable reason or meet max retry times", zap.Error(err))
|
||||||
t.State = datapb.CompactionTaskState_failed
|
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
|
||||||
t.FailReason = err.Error()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// task state update, refresh retry times count
|
// task state update, refresh retry times count
|
||||||
currentState := t.State.String()
|
currentState := t.State.String()
|
||||||
if currentState != lastState {
|
if currentState != lastState {
|
||||||
t.RetryTimes = 0
|
|
||||||
ts := time.Now().UnixMilli()
|
ts := time.Now().UnixMilli()
|
||||||
lastStateDuration := ts - t.lastUpdateStateTime
|
t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts))
|
||||||
|
lastStateDuration := ts - t.GetLastStateStartTime()
|
||||||
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration))
|
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration))
|
||||||
metrics.DataCoordCompactionLatency.
|
metrics.DataCoordCompactionLatency.
|
||||||
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), datapb.CompactionType_ClusteringCompaction.String(), lastState).
|
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), datapb.CompactionType_ClusteringCompaction.String(), lastState).
|
||||||
Observe(float64(lastStateDuration))
|
Observe(float64(lastStateDuration))
|
||||||
t.lastUpdateStateTime = ts
|
|
||||||
|
|
||||||
if t.State == datapb.CompactionTaskState_completed {
|
if t.State == datapb.CompactionTaskState_completed {
|
||||||
t.updateAndSaveTaskMeta(setEndTime(ts))
|
t.updateAndSaveTaskMeta(setEndTime(ts))
|
||||||
|
@ -528,7 +528,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
|
|||||||
|
|
||||||
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
|
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
|
||||||
s.SetupTest()
|
s.SetupTest()
|
||||||
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Once()
|
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe()
|
||||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
|
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
|
||||||
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)
|
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)
|
||||||
|
|
||||||
@ -545,7 +545,7 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
|
|||||||
s.handler.taskNumber.Add(1000)
|
s.handler.taskNumber.Add(1000)
|
||||||
task.PlanID = 2
|
task.PlanID = 2
|
||||||
err = s.handler.enqueueCompaction(task)
|
err = s.handler.enqueueCompaction(task)
|
||||||
s.Error(err)
|
s.NoError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
|
func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
|
||||||
|
@ -289,6 +289,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
|
|||||||
PreferSegmentRows: preferSegmentRows,
|
PreferSegmentRows: preferSegmentRows,
|
||||||
TotalRows: totalRows,
|
TotalRows: totalRows,
|
||||||
AnalyzeTaskID: taskID + 1,
|
AnalyzeTaskID: taskID + 1,
|
||||||
|
LastStateStartTime: time.Now().UnixMilli(),
|
||||||
}
|
}
|
||||||
err = m.compactionHandler.enqueueCompaction(task)
|
err = m.compactionHandler.enqueueCompaction(task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -913,6 +913,7 @@ message CompactionTask{
|
|||||||
int64 prefer_segment_rows = 22;
|
int64 prefer_segment_rows = 22;
|
||||||
int64 analyzeTaskID = 23;
|
int64 analyzeTaskID = 23;
|
||||||
int64 analyzeVersion = 24;
|
int64 analyzeVersion = 24;
|
||||||
|
int64 lastStateStartTime = 25;
|
||||||
}
|
}
|
||||||
|
|
||||||
message PartitionStatsInfo {
|
message PartitionStatsInfo {
|
||||||
|
Loading…
Reference in New Issue
Block a user