enhance:[cherry-pick] improve ut cov of clustering compaction task (#35243)

issue: #34792
master pr: #35242

Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
wayblink 2024-08-09 15:26:18 +08:00 committed by GitHub
parent 96c53bcf2b
commit f7a3fd8bbf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 429 additions and 159 deletions

View File

@ -279,10 +279,15 @@ func (t *clusteringCompactionTask) processMetaSaved() error {
func (t *clusteringCompactionTask) processIndexing() error {
// wait for segment indexed
collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetCollectionID(), "")
if len(collectionIndexes) == 0 {
log.Debug("the collection has no index, no need to do indexing")
return t.completeTask()
}
indexed := func() bool {
for _, collectionIndex := range collectionIndexes {
for _, segmentID := range t.ResultSegments {
segmentIndexState := t.meta.GetIndexMeta().GetSegmentIndexState(t.GetCollectionID(), segmentID, collectionIndex.IndexID)
log.Debug("segment index state", zap.String("segment", segmentIndexState.String()))
if segmentIndexState.GetState() != commonpb.IndexState_Finished {
return false
}
@ -292,7 +297,7 @@ func (t *clusteringCompactionTask) processIndexing() error {
}()
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetPlanID()), zap.Int64s("segments", t.ResultSegments))
if indexed {
t.completeTask()
return t.completeTask()
}
return nil
}

View File

@ -29,7 +29,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -42,18 +44,20 @@ func TestClusteringCompactionTaskSuite(t *testing.T) {
type ClusteringCompactionTaskSuite struct {
suite.Suite
mockID atomic.Int64
mockAlloc *NMockAllocator
meta *meta
mockSessMgr *MockSessionManager
handler *NMockHandler
session *MockSessionManager
mockID atomic.Int64
mockAlloc *NMockAllocator
meta *meta
mockSessMgr *MockSessionManager
handler *NMockHandler
session *MockSessionManager
analyzeScheduler *taskScheduler
}
func (s *ClusteringCompactionTaskSuite) SetupTest() {
ctx := context.Background()
cm := storage.NewLocalChunkManager(storage.RootPath(""))
catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "")
meta, err := newMeta(context.TODO(), catalog, cm)
meta, err := newMeta(ctx, catalog, cm)
s.NoError(err)
s.meta = meta
@ -75,6 +79,9 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe()
s.session = NewMockSessionManager(s.T())
scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil)
s.analyzeScheduler = scheduler
}
func (s *ClusteringCompactionTaskSuite) SetupSubTest() {
@ -82,8 +89,6 @@ func (s *ClusteringCompactionTaskSuite) SetupSubTest() {
}
func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() {
channel := "Ch-1"
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
@ -99,39 +104,9 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
PartitionStatsVersion: 10000,
},
})
session := NewSessionManagerImpl()
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true)
pk := &schemapb.FieldSchema{
FieldID: 100,
Name: Int64Field,
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
TypeParams: nil,
IndexParams: nil,
AutoID: true,
IsClusteringKey: true,
}
task := &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Channel: channel,
Type: datapb.CompactionType_ClusteringCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_pipelining,
Schema: schema,
ClusteringKeyField: pk,
InputSegments: []int64{101, 102},
ResultSegments: []int64{1000, 1100},
},
meta: s.meta,
sessions: session,
}
task := s.generateBasicTask(false)
task.processPipelining()
@ -178,40 +153,49 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.Equal(int64(0), seg42.PartitionStatsVersion)
}
func (s *ClusteringCompactionTaskSuite) generateBasicTask() *clusteringCompactionTask {
schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true)
pk := &schemapb.FieldSchema{
FieldID: 100,
Name: Int64Field,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: true,
IsClusteringKey: true,
func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask {
schema := ConstructClusteringSchema("TestClusteringCompactionTask", 32, true, vectorClusteringKey)
var pk *schemapb.FieldSchema
if vectorClusteringKey {
pk = &schemapb.FieldSchema{
FieldID: 101,
Name: FloatVecField,
IsPrimaryKey: false,
DataType: schemapb.DataType_FloatVector,
IsClusteringKey: true,
}
} else {
pk = &schemapb.FieldSchema{
FieldID: 100,
Name: Int64Field,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: true,
IsClusteringKey: true,
}
}
task := &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_ClusteringCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_pipelining,
Schema: schema,
ClusteringKeyField: pk,
InputSegments: []int64{101, 102},
ResultSegments: []int64{1000, 1100},
},
meta: s.meta,
handler: s.handler,
sessions: s.session,
compactionTask := &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_ClusteringCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_pipelining,
Schema: schema,
ClusteringKeyField: pk,
InputSegments: []int64{101, 102},
ResultSegments: []int64{1000, 1100},
}
task := newClusteringCompactionTask(compactionTask, s.meta, s.session, s.handler, s.analyzeScheduler)
task.maxRetryTimes = 0
return task
}
func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
task := s.generateBasicTask()
task := s.generateBasicTask(false)
task.maxRetryTimes = 3
// process pipelining fail
s.Equal(false, task.Process())
@ -226,96 +210,242 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
}
func (s *ClusteringCompactionTaskSuite) TestProcessStateChange() {
task := s.generateBasicTask()
// process pipelining fail
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
// process pipelining succeed
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
s.Run("process pipelining fail, segment not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_pipelining
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task.State = datapb.CompactionTaskState_pipelining
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
// process executing
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(1)).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
// repipelining
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
task.NodeID = 1
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
// process executing
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_executing,
}, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_completed,
Segments: []*datapb.CompactionSegment{
{
SegmentID: 1000,
s.Run("pipelining fail, no datanode slot", func() {
task := s.generateBasicTask(false)
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
{
SegmentID: 1001,
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
},
}, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
})
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrDataNodeSlotExhausted())
task.State = datapb.CompactionTaskState_pipelining
s.False(task.Process())
s.Equal(int64(NullNodeID), task.GetNodeID())
})
s.Run("process succeed, scalar clustering key", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_pipelining
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task.State = datapb.CompactionTaskState_pipelining
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
})
s.Run("process succeed, vector clustering key", func() {
task := s.generateBasicTask(true)
task.State = datapb.CompactionTaskState_pipelining
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
task.State = datapb.CompactionTaskState_pipelining
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_analyzing, task.GetState())
})
}
func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
s.Run("process executing, get compaction result fail", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(1)).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
s.Run("process executing, compaction result not ready", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_executing,
}, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
})
s.Run("process executing, scalar clustering key, compaction result ready", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_completed,
Segments: []*datapb.CompactionSegment{
{
SegmentID: 1000,
},
{
SegmentID: 1001,
},
},
}, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
})
s.Run("process executing, compaction result ready", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_completed,
Segments: []*datapb.CompactionSegment{
{
SegmentID: 1000,
},
{
SegmentID: 1001,
},
},
}, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
})
s.Run("process executing, compaction result timeout", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
task.StartTime = time.Now().Unix()
task.TimeoutInSeconds = 1
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_executing,
Segments: []*datapb.CompactionSegment{
{
SegmentID: 1000,
},
{
SegmentID: 1001,
},
},
}, nil).Once()
time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetState())
})
task := s.generateBasicTask()
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrDataNodeSlotExhausted())
task.State = datapb.CompactionTaskState_pipelining
s.NoError(task.doCompact())
s.Equal(int64(NullNodeID), task.GetNodeID())
}
func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
task := s.generateBasicTask()
task := s.generateBasicTask(false)
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_failed,
}, nil).Once()
@ -323,7 +453,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_indexing,
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.NoError(task.processExecuting())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
@ -355,9 +485,138 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
}
func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
s.Run("collection has no index", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_indexing
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_completed, task.GetState())
})
s.Run("collection has index, segment is not indexed", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_indexing
index := &model.Index{
CollectionID: 1,
IndexID: 3,
}
err := s.meta.indexMeta.CreateIndex(index)
s.NoError(err)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
})
s.Run("collection has index, segment indexed", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_indexing
index := &model.Index{
CollectionID: 1,
IndexID: 3,
}
err := s.meta.indexMeta.CreateIndex(index)
s.NoError(err)
s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
IndexID: 3,
SegmentID: 1000,
CollectionID: 1,
IndexState: commonpb.IndexState_Finished,
})
s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
IndexID: 3,
SegmentID: 1100,
CollectionID: 1,
IndexState: commonpb.IndexState_Finished,
})
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_completed, task.GetState())
})
}
func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.Run("analyze task not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})
s.Run("analyze task failed", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
task.AnalyzeTaskID = 7
t := &indexpb.AnalyzeTask{
CollectionID: task.CollectionID,
PartitionID: task.PartitionID,
FieldID: task.ClusteringKeyField.FieldID,
SegmentIDs: task.InputSegments,
TaskID: 7,
State: indexpb.JobState_JobStateFailed,
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})
s.Run("analyze task fake finish, vector not support", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
task.AnalyzeTaskID = 7
t := &indexpb.AnalyzeTask{
CollectionID: task.CollectionID,
PartitionID: task.PartitionID,
FieldID: task.ClusteringKeyField.FieldID,
SegmentIDs: task.InputSegments,
TaskID: 7,
State: indexpb.JobState_JobStateFinished,
CentroidsFile: "",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})
s.Run("analyze task finished", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
task.AnalyzeTaskID = 7
t := &indexpb.AnalyzeTask{
CollectionID: task.CollectionID,
PartitionID: task.PartitionID,
FieldID: task.ClusteringKeyField.FieldID,
SegmentIDs: task.InputSegments,
TaskID: 7,
State: indexpb.JobState_JobStateFinished,
CentroidsFile: "somewhere",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
})
}
// fix: https://github.com/milvus-io/milvus/issues/35110
func (s *ClusteringCompactionTaskSuite) TestCompleteTask() {
task := s.generateBasicTask()
task := s.generateBasicTask(false)
task.completeTask()
partitionStats := s.meta.GetPartitionStatsMeta().GetPartitionStats(task.GetCollectionID(), task.GetPartitionID(), task.GetChannel(), task.GetPlanID())
s.True(partitionStats.GetCommitTime() > time.Now().Add(-2*time.Second).Unix())
@ -368,7 +627,7 @@ const (
FloatVecField = "floatVecField"
)
func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema {
func ConstructClusteringSchema(collection string, dim int, autoID bool, vectorClusteringKey bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema {
// if fields are specified, construct it
if len(fields) > 0 {
return &schemapb.CollectionSchema{
@ -380,15 +639,14 @@ func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fi
// if no field is specified, use default
pk := &schemapb.FieldSchema{
FieldID: 100,
Name: Int64Field,
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
TypeParams: nil,
IndexParams: nil,
AutoID: autoID,
IsClusteringKey: true,
FieldID: 100,
Name: Int64Field,
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
TypeParams: nil,
IndexParams: nil,
AutoID: autoID,
}
fVec := &schemapb.FieldSchema{
FieldID: 101,
@ -404,6 +662,13 @@ func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fi
},
IndexParams: nil,
}
if vectorClusteringKey {
pk.IsClusteringKey = true
} else {
fVec.IsClusteringKey = true
}
return &schemapb.CollectionSchema{
Name: collection,
AutoID: autoID,