milvus/internal/datacoord/compaction_task_clustering.go
cai.zhang 4dc684126e
enhance: Handoff growing segment after sorted (#37385)
issue: #33744 

1. Segments generated from inserts will be loaded as growing until they
are sorted by primary key.
2. This PR may increase memory pressure on the delegator, but we need to
test the performance of stats. In local testing, the speed of stats is
greater than the insert speed.

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
2024-11-07 16:08:24 +08:00

762 lines
31 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"path"
"strconv"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ CompactionTask = (*clusteringCompactionTask)(nil)
type clusteringCompactionTask struct {
taskProto atomic.Value // *datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span
allocator allocator.Allocator
meta CompactionMeta
sessions session.DataNodeManager
handler Handler
analyzeScheduler *taskScheduler
maxRetryTimes int32
slotUsage int64
}
func (t *clusteringCompactionTask) GetTaskProto() *datapb.CompactionTask {
task := t.taskProto.Load()
if task == nil {
return nil
}
return task.(*datapb.CompactionTask)
}
func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager, handler Handler, analyzeScheduler *taskScheduler) *clusteringCompactionTask {
task := &clusteringCompactionTask{
allocator: allocator,
meta: meta,
sessions: session,
handler: handler,
analyzeScheduler: analyzeScheduler,
maxRetryTimes: 3,
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
task.taskProto.Store(t)
return task
}
func (t *clusteringCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
lastState := t.GetTaskProto().GetState().String()
err := t.retryableProcess()
if err != nil {
log.Warn("fail in process task", zap.Error(err))
if merr.IsRetryableErr(err) && t.GetTaskProto().RetryTimes < t.maxRetryTimes {
// retry in next Process
err = t.updateAndSaveTaskMeta(setRetryTimes(t.GetTaskProto().RetryTimes + 1))
} else {
log.Error("task fail with unretryable reason or meet max retry times", zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
}
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))
}
}
// task state update, refresh retry times count
currentState := t.GetTaskProto().State.String()
if currentState != lastState {
ts := time.Now().Unix()
lastStateDuration := ts - t.GetTaskProto().GetLastStateStartTime()
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration * 1000))
updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)}
if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned {
updateOps = append(updateOps, setEndTime(ts))
elapse := ts - t.GetTaskProto().StartTime
log.Info("clustering compaction task total elapse", zap.Int64("elapse seconds", elapse))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
Observe(float64(elapse * 1000))
}
err = t.updateAndSaveTaskMeta(updateOps...)
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))
}
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned
}
// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned {
return nil
}
coll, err := t.handler.GetCollection(context.Background(), t.GetTaskProto().GetCollectionID())
if err != nil {
// retryable
log.Warn("fail to get collection", zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Error(err))
return merr.WrapErrClusteringCompactionGetCollectionFail(t.GetTaskProto().GetCollectionID(), err)
}
if coll == nil {
// not-retryable fail fast if collection is dropped
log.Warn("collection not found, it may be dropped, stop clustering compaction task", zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
return merr.WrapErrCollectionNotFound(t.GetTaskProto().GetCollectionID())
}
switch t.GetTaskProto().State {
case datapb.CompactionTaskState_pipelining:
return t.processPipelining()
case datapb.CompactionTaskState_executing:
return t.processExecuting()
case datapb.CompactionTaskState_analyzing:
return t.processAnalyzing()
case datapb.CompactionTaskState_meta_saved:
return t.processMetaSaved()
case datapb.CompactionTaskState_indexing:
return t.processIndexing()
case datapb.CompactionTaskState_statistic:
return t.processStats()
case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_failed:
return t.processFailedOrTimeout()
}
return nil
}
func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
return nil, err
}
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: taskProto.GetPlanID(),
StartTime: taskProto.GetStartTime(),
TimeoutInSeconds: taskProto.GetTimeoutInSeconds(),
Type: taskProto.GetType(),
Channel: taskProto.GetChannel(),
CollectionTtl: taskProto.GetCollectionTtl(),
TotalRows: taskProto.GetTotalRows(),
Schema: taskProto.GetSchema(),
ClusteringKeyField: taskProto.GetClusteringKeyField().GetFieldID(),
MaxSegmentRows: taskProto.GetMaxSegmentRows(),
PreferSegmentRows: taskProto.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(taskProto.AnalyzeTaskID, taskProto.AnalyzeVersion)),
AnalyzeSegmentIds: taskProto.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)
}
plan.SegmentBinlogs = append(plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
CollectionID: segInfo.GetCollectionID(),
PartitionID: segInfo.GetPartitionID(),
Level: segInfo.GetLevel(),
InsertChannel: segInfo.GetInsertChannel(),
FieldBinlogs: segInfo.GetBinlogs(),
Field2StatslogPaths: segInfo.GetStatslogs(),
Deltalogs: segInfo.GetDeltalogs(),
IsSorted: segInfo.GetIsSorted(),
})
}
log.Info("Compaction handler build clustering compaction plan")
return plan, nil
}
func (t *clusteringCompactionTask) processPipelining() error {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().TriggerID), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()))
if t.NeedReAssignNodeID() {
log.Debug("wait for the node to be assigned before proceeding with the subsequent steps")
return nil
}
// don't mark segment level to L2 before clustering compaction after v2.5.0
if typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType) {
err := t.doAnalyze()
if err != nil {
log.Warn("fail to submit analyze task", zap.Error(err))
return merr.WrapErrClusteringCompactionSubmitTaskFail("analyze", err)
}
} else {
err := t.doCompact()
if err != nil {
log.Warn("fail to submit compaction task", zap.Error(err))
return merr.WrapErrClusteringCompactionSubmitTaskFail("compact", err)
}
}
return nil
}
func (t *clusteringCompactionTask) processExecuting() error {
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
result, err := t.sessions.GetCompactionPlanResult(t.GetTaskProto().GetNodeID(), t.GetTaskProto().GetPlanID())
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// setNodeID(NullNodeID) to trigger reassign node ID
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
return err
}
log.Info("compaction result", zap.Any("result", result.String()))
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
result := t.result
if len(result.GetSegments()) == 0 {
log.Warn("illegal compaction results, this should not happen")
return merr.WrapErrCompactionResult("compaction result is empty")
}
resultSegmentIDs := lo.Map(result.Segments, func(segment *datapb.CompactionSegment, _ int) int64 {
return segment.GetSegmentID()
})
_, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetTaskProto(), t.result)
if err != nil {
return err
}
metricMutation.commit()
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setTmpSegments(resultSegmentIDs))
if err != nil {
return err
}
return t.processMetaSaved()
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
}
return nil
case datapb.CompactionTaskState_failed:
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
default:
log.Error("not support compaction task state", zap.String("state", result.GetState().String()))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
}
}
func (t *clusteringCompactionTask) processMetaSaved() error {
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
}
// to ensure compatibility, if a task upgraded from version 2.4 has a status of MetaSave,
// its TmpSegments will be empty, so skip the stats task, to build index.
if len(t.GetTaskProto().GetTmpSegments()) == 0 {
log.Info("tmp segments is nil, skip stats task", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic))
}
func (t *clusteringCompactionTask) processStats() error {
// just the memory step, if it crashes at this step, the state after recovery is CompactionTaskState_statistic.
resultSegments := make([]int64, 0, len(t.GetTaskProto().GetTmpSegments()))
if Params.DataCoordCfg.EnableStatsTask.GetAsBool() {
existNonStats := false
tmpToResultSegments := make(map[int64][]int64, len(t.GetTaskProto().GetTmpSegments()))
for _, segmentID := range t.GetTaskProto().GetTmpSegments() {
to, ok := t.meta.(*meta).GetCompactionTo(segmentID)
if !ok || to == nil {
select {
case getStatsTaskChSingleton() <- segmentID:
default:
}
existNonStats = true
continue
}
tmpToResultSegments[segmentID] = lo.Map(to, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
resultSegments = append(resultSegments, lo.Map(to, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)
}
if existNonStats {
return nil
}
if err := t.regeneratePartitionStats(tmpToResultSegments); err != nil {
log.Warn("regenerate partition stats failed, wait for retry", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("regeneratePartitionStats", err)
}
} else {
log.Info("stats task is not enable, set tmp segments to result segments", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
resultSegments = t.GetTaskProto().GetTmpSegments()
}
log.Info("clustering compaction stats task finished", zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64s("tmp segments", t.GetTaskProto().GetTmpSegments()),
zap.Int64s("result segments", resultSegments))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing), setResultSegments(resultSegments))
}
// this is just a temporary solution. A more long-term solution should be for the indexnode
// to regenerate the clustering information corresponding to each segment and merge them at the vshard level.
func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments map[int64][]int64) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
if err != nil {
log.Error("chunk manager init failed", zap.Error(err))
return err
}
partitionStatsFile := path.Join(cli.RootPath(), common.PartitionStatsPath,
metautil.JoinIDPath(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID()), t.plan.GetChannel(),
strconv.FormatInt(t.GetTaskProto().GetPlanID(), 10))
value, err := cli.Read(ctx, partitionStatsFile)
if err != nil {
log.Warn("read partition stats file failed", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
}
partitionStats, err := storage.DeserializePartitionsStatsSnapshot(value)
if err != nil {
log.Warn("deserialize partition stats failed", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
}
for from, to := range tmpToResultSegments {
stats := partitionStats.SegmentStats[from]
// stats task only one to
for _, toID := range to {
partitionStats.SegmentStats[toID] = stats
}
delete(partitionStats.SegmentStats, from)
}
partitionStatsBytes, err := storage.SerializePartitionStatsSnapshot(partitionStats)
if err != nil {
log.Warn("serialize partition stats failed", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
}
err = cli.Write(ctx, partitionStatsFile, partitionStatsBytes)
if err != nil {
log.Warn("save partition stats file failed", zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.String("path", partitionStatsFile), zap.Error(err))
return err
}
return nil
}
func (t *clusteringCompactionTask) processIndexing() error {
// wait for segment indexed
collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetTaskProto().GetCollectionID(), "")
if len(collectionIndexes) == 0 {
log.Debug("the collection has no index, no need to do indexing")
return t.completeTask()
}
indexed := func() bool {
for _, collectionIndex := range collectionIndexes {
for _, segmentID := range t.GetTaskProto().GetResultSegments() {
segmentIndexState := t.meta.GetIndexMeta().GetSegmentIndexState(t.GetTaskProto().GetCollectionID(), segmentID, collectionIndex.IndexID)
log.Debug("segment index state", zap.String("segment", segmentIndexState.String()))
if segmentIndexState.GetState() != commonpb.IndexState_Finished {
return false
}
}
}
return true
}()
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Int64s("segments", t.GetTaskProto().ResultSegments))
if indexed {
return t.completeTask()
}
return nil
}
func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
var operators []UpdateOperator
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, SetSegmentIsInvisible(segID, false))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("markResultSegmentVisible UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("markResultSegmentVisible UpdateSegmentsInfo", err)
}
return nil
}
func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
var operators []UpdateOperator
// mark
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("markInputSegmentsDropped UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("markInputSegmentsDropped UpdateSegmentsInfo", err)
}
return nil
}
// indexed is the final state of a clustering compaction task
// one task should only run this once
func (t *clusteringCompactionTask) completeTask() error {
var err error
// first mark result segments visible
if err = t.markResultSegmentsVisible(); err != nil {
return err
}
// update current partition stats version
// at this point, the segment view includes both the input segments and the result segments.
if err = t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{
CollectionID: t.GetTaskProto().GetCollectionID(),
PartitionID: t.GetTaskProto().GetPartitionID(),
VChannel: t.GetTaskProto().GetChannel(),
Version: t.GetTaskProto().GetPlanID(),
SegmentIDs: t.GetTaskProto().GetResultSegments(),
CommitTime: time.Now().Unix(),
}); err != nil {
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
}
// mark input segments as dropped
// now, the segment view only includes the result segments.
if err = t.markInputSegmentsDropped(); err != nil {
return err
}
err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetTaskProto().GetCollectionID(),
t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID())
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
}
func (t *clusteringCompactionTask) processAnalyzing() error {
analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetTaskProto().GetAnalyzeTaskID())
if analyzeTask == nil {
log.Warn("analyzeTask not found", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()))
return merr.WrapErrAnalyzeTaskNotFound(t.GetTaskProto().GetAnalyzeTaskID()) // retryable
}
log.Info("check analyze task state", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String()))
switch analyzeTask.State {
case indexpb.JobState_JobStateFinished:
if analyzeTask.GetCentroidsFile() == "" {
// not retryable, fake finished vector clustering is not supported in opensource
return merr.WrapErrClusteringCompactionNotSupportVector()
} else {
t.GetTaskProto().AnalyzeVersion = analyzeTask.GetVersion()
return t.doCompact()
}
case indexpb.JobState_JobStateFailed:
log.Warn("analyze task fail", zap.Int64("analyzeID", t.GetTaskProto().GetAnalyzeTaskID()))
return errors.New(analyzeTask.FailReason)
default:
}
return nil
}
func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
}
func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("state", t.GetTaskProto().GetState().String()))
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
}
isInputDropped := false
for _, segID := range t.GetTaskProto().GetInputSegments() {
if t.meta.GetHealthySegment(segID) == nil {
isInputDropped = true
break
}
}
if isInputDropped {
log.Info("input segments dropped, doing for compatibility",
zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()))
// this task must be generated by v2.4, just for compatibility
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
for _, segID := range t.GetTaskProto().GetTmpSegments() {
// maybe no necessary, there will be no `TmpSegments` that task was generated by v2.4
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
} else {
// after v2.5.0, mark the results segment as dropped
var operators []UpdateOperator
for _, segID := range t.GetTaskProto().GetResultSegments() {
// Don't worry about them being loaded; they are all invisible.
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
for _, segID := range t.GetTaskProto().GetTmpSegments() {
// Don't worry about them being loaded; they are all invisible.
// tmpSegment is always invisible
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
}
t.resetSegmentCompacting()
// drop partition stats if uploaded
partitionStatsInfo := &datapb.PartitionStatsInfo{
CollectionID: t.GetTaskProto().GetCollectionID(),
PartitionID: t.GetTaskProto().GetPartitionID(),
VChannel: t.GetTaskProto().GetChannel(),
Version: t.GetTaskProto().GetPlanID(),
SegmentIDs: t.GetTaskProto().GetResultSegments(),
}
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
}
func (t *clusteringCompactionTask) doAnalyze() error {
analyzeTask := &indexpb.AnalyzeTask{
CollectionID: t.GetTaskProto().GetCollectionID(),
PartitionID: t.GetTaskProto().GetPartitionID(),
FieldID: t.GetTaskProto().GetClusteringKeyField().FieldID,
FieldName: t.GetTaskProto().GetClusteringKeyField().Name,
FieldType: t.GetTaskProto().GetClusteringKeyField().DataType,
SegmentIDs: t.GetTaskProto().GetInputSegments(),
TaskID: t.GetTaskProto().GetAnalyzeTaskID(),
State: indexpb.JobState_JobStateInit,
}
err := t.meta.GetAnalyzeMeta().AddAnalyzeTask(analyzeTask)
if err != nil {
log.Warn("failed to create analyze task", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
}
t.analyzeScheduler.enqueue(newAnalyzeTask(t.GetTaskProto().GetAnalyzeTaskID()))
log.Info("submit analyze task", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
}
func (t *clusteringCompactionTask) doCompact() error {
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
if t.NeedReAssignNodeID() {
log.RatedWarn(10, "not assign nodeID")
return nil
}
log = log.With(zap.Int64("nodeID", t.GetTaskProto().GetNodeID()))
// todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird
// check whether the compaction plan is already submitted considering
// datacoord may crash between call sessions.Compaction and updateTaskState to executing
// result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
// if err != nil {
// if errors.Is(err, merr.ErrNodeNotFound) {
// log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// // setNodeID(NullNodeID) to trigger reassign node ID
// t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
// return nil
// }
// return merr.WrapErrGetCompactionPlanResultFail(err)
// }
// if result != nil {
// log.Info("compaction already submitted")
// t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
// return nil
// }
var err error
t.plan, err = t.BuildCompactionRequest()
if err != nil {
log.Warn("Failed to BuildCompactionRequest", zap.Error(err))
return err
}
err = t.sessions.Compaction(context.Background(), t.GetTaskProto().GetNodeID(), t.GetPlan())
if err != nil {
if errors.Is(err, merr.ErrDataNodeSlotExhausted) {
log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted")
return t.updateAndSaveTaskMeta(setNodeID(NullNodeID))
}
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
}
func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := proto.Clone(t.GetTaskProto()).(*datapb.CompactionTask)
for _, opt := range opts {
opt(taskClone)
}
return taskClone
}
func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
log.Warn("Failed to saveTaskMeta", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable
}
t.SetTask(task)
return nil
}
func (t *clusteringCompactionTask) checkTimeout() bool {
if t.GetTaskProto().GetTimeoutInSeconds() > 0 {
diff := time.Since(time.Unix(t.GetTaskProto().GetStartTime(), 0)).Seconds()
if diff > float64(t.GetTaskProto().GetTimeoutInSeconds()) {
log.Warn("compaction timeout",
zap.Int32("timeout in seconds", t.GetTaskProto().GetTimeoutInSeconds()),
zap.Int64("startTime", t.GetTaskProto().GetStartTime()),
)
return true
}
}
return false
}
func (t *clusteringCompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
return t.meta.SaveCompactionTask(task)
}
func (t *clusteringCompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.GetTaskProto())
}
func (t *clusteringCompactionTask) GetPlan() *datapb.CompactionPlan {
return t.plan
}
func (t *clusteringCompactionTask) GetResult() *datapb.CompactionPlanResult {
return t.result
}
func (t *clusteringCompactionTask) GetSpan() trace.Span {
return t.span
}
func (t *clusteringCompactionTask) EndSpan() {
if t.span != nil {
t.span.End()
}
}
func (t *clusteringCompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
}
func (t *clusteringCompactionTask) SetSpan(span trace.Span) {
t.span = span
}
func (t *clusteringCompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}
func (t *clusteringCompactionTask) SetTask(task *datapb.CompactionTask) {
t.taskProto.Store(task)
}
func (t *clusteringCompactionTask) SetNodeID(id UniqueID) error {
return t.updateAndSaveTaskMeta(setNodeID(id))
}
func (t *clusteringCompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.GetTaskProto().PartitionID, t.GetTaskProto().GetChannel())
}
func (t *clusteringCompactionTask) NeedReAssignNodeID() bool {
return t.GetTaskProto().GetState() == datapb.CompactionTaskState_pipelining && (t.GetTaskProto().GetNodeID() == 0 || t.GetTaskProto().GetNodeID() == NullNodeID)
}
func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.slotUsage
}