diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index f160563740..ba8299c4f8 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "sort" "time" "github.com/samber/lo" @@ -31,7 +30,6 @@ import ( "github.com/milvus-io/milvus/internal/util/clustering" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/tsoutil" ) type clusteringCompactionPolicy struct { @@ -216,12 +214,8 @@ func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionVi func triggerClusteringCompactionPolicy(ctx context.Context, meta *meta, collectionID int64, partitionID int64, channel string, segments []*SegmentInfo) (bool, error) { log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID)) - partitionStatsInfos := meta.partitionStatsMeta.ListPartitionStatsInfos(collectionID, partitionID, channel) - sort.Slice(partitionStatsInfos, func(i, j int) bool { - return partitionStatsInfos[i].Version > partitionStatsInfos[j].Version - }) - - if len(partitionStatsInfos) == 0 { + currentVersion := meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + if currentVersion == 0 { var newDataSize int64 = 0 for _, seg := range segments { newDataSize += seg.getSegmentSize() @@ -234,9 +228,13 @@ func triggerClusteringCompactionPolicy(ctx context.Context, meta *meta, collecti return false, nil } - partitionStats := partitionStatsInfos[0] - version := partitionStats.Version - pTime, _ := tsoutil.ParseTS(uint64(version)) + partitionStats := meta.GetPartitionStatsMeta().GetPartitionStats(collectionID, partitionID, channel, currentVersion) + if partitionStats == nil { + log.Info("partition stats not found") + return false, nil + } + timestampSeconds := partitionStats.GetCommitTime() + pTime := time.Unix(timestampSeconds, 0) if time.Since(pTime) < Params.DataCoordCfg.ClusteringCompactionMinInterval.GetAsDuration(time.Second) { log.Info("Too short time before last clustering compaction, skip compaction") return false, nil diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index 9014b81623..87a226df1e 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -18,11 +18,14 @@ package datacoord import ( "context" "testing" + "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "go.uber.org/atomic" + "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" ) @@ -33,12 +36,12 @@ func TestClusteringCompactionPolicySuite(t *testing.T) { type ClusteringCompactionPolicySuite struct { suite.Suite - mockAlloc *NMockAllocator - mockTriggerManager *MockTriggerManager - testLabel *CompactionGroupLabel - handler *NMockHandler - mockPlanContext *MockCompactionPlanContext - + mockAlloc *NMockAllocator + mockTriggerManager *MockTriggerManager + testLabel *CompactionGroupLabel + handler *NMockHandler + mockPlanContext *MockCompactionPlanContext + catalog *mocks.DataCoordCatalog clusteringCompactionPolicy *clusteringCompactionPolicy } @@ -49,6 +52,11 @@ func (s *ClusteringCompactionPolicySuite) SetupTest() { Channel: "ch-1", } + catalog := mocks.NewDataCoordCatalog(s.T()) + catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe() + catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe() + s.catalog = catalog + segments := genSegmentsForMeta(s.testLabel) meta := &meta{segments: NewSegmentsInfo()} for id, segment := range segments { @@ -184,5 +192,115 @@ func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting() } func (s *ClusteringCompactionPolicySuite) TestGetExpectedSegmentSize() { - +} + +func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() { + ctx := context.TODO() + collectionID := int64(100) + partitionID := int64(101) + channel := "ch1" + + tests := []struct { + description string + partitionStats []*datapb.PartitionStatsInfo + currentVersion int64 + segments []*SegmentInfo + succeed bool + }{ + {"no partition stats and not enough new data", []*datapb.PartitionStatsInfo{}, emptyPartitionStatsVersion, []*SegmentInfo{}, false}, + {"no partition stats and enough new data", []*datapb.PartitionStatsInfo{}, emptyPartitionStatsVersion, []*SegmentInfo{ + { + size: *atomic.NewInt64(1024 * 1024 * 1024 * 10), + }, + }, true}, + {"very recent partition stats and enough new data", + []*datapb.PartitionStatsInfo{ + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + CommitTime: time.Now().Unix(), + Version: 100, + }, + }, + 100, + []*SegmentInfo{ + { + size: *atomic.NewInt64(1024 * 1024 * 1024 * 10), + }, + }, false}, + {"very old partition stats and not enough new data", + []*datapb.PartitionStatsInfo{ + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + CommitTime: time.Unix(1704038400, 0).Unix(), + Version: 100, + }, + }, + 100, + []*SegmentInfo{ + { + size: *atomic.NewInt64(1024), + }, + }, true}, + {"partition stats and enough new data", + []*datapb.PartitionStatsInfo{ + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + CommitTime: time.Now().Add(-3 * time.Hour).Unix(), + SegmentIDs: []int64{100000}, + Version: 100, + }, + }, + 100, + []*SegmentInfo{ + { + SegmentInfo: &datapb.SegmentInfo{ID: 9999}, + size: *atomic.NewInt64(1024 * 1024 * 1024 * 10), + }, + }, true}, + {"partition stats and not enough new data", + []*datapb.PartitionStatsInfo{ + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + CommitTime: time.Now().Add(-3 * time.Hour).Unix(), + SegmentIDs: []int64{100000}, + Version: 100, + }, + }, + 100, + []*SegmentInfo{ + { + SegmentInfo: &datapb.SegmentInfo{ID: 9999}, + size: *atomic.NewInt64(1024), + }, + }, false}, + } + + for _, test := range tests { + s.Run(test.description, func() { + partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog) + s.NoError(err) + for _, partitionStats := range test.partitionStats { + partitionStatsMeta.SavePartitionStatsInfo(partitionStats) + } + if test.currentVersion != 0 { + partitionStatsMeta.partitionStatsInfos[channel][partitionID].currentVersion = test.currentVersion + } + + meta := &meta{ + partitionStatsMeta: partitionStatsMeta, + } + + succeed, err := triggerClusteringCompactionPolicy(ctx, meta, collectionID, partitionID, channel, test.segments) + s.NoError(err) + s.Equal(test.succeed, succeed) + }) + } } diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 693689d2df..8b96aea12d 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -317,6 +317,7 @@ func (t *clusteringCompactionTask) completeTask() error { VChannel: t.GetChannel(), Version: t.GetPlanID(), SegmentIDs: t.GetResultSegments(), + CommitTime: time.Now().Unix(), }) if err != nil { return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err) diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 8923b551d9..749b074c0d 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -357,6 +357,14 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() { s.Equal(datapb.CompactionTaskState_failed, task.GetState()) } +// fix: https://github.com/milvus-io/milvus/issues/35110 +func (s *ClusteringCompactionTaskSuite) TestCompleteTask() { + task := s.generateBasicTask() + task.completeTask() + partitionStats := s.meta.GetPartitionStatsMeta().GetPartitionStats(task.GetCollectionID(), task.GetPartitionID(), task.GetChannel(), task.GetPlanID()) + s.True(partitionStats.GetCommitTime() > time.Now().Add(-2*time.Second).Unix()) +} + const ( Int64Field = "int64Field" FloatVecField = "floatVecField" diff --git a/internal/datacoord/partition_stats_meta.go b/internal/datacoord/partition_stats_meta.go index 33cd5aab2f..f379afe67f 100644 --- a/internal/datacoord/partition_stats_meta.go +++ b/internal/datacoord/partition_stats_meta.go @@ -14,6 +14,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/timerecord" ) +const emptyPartitionStatsVersion = int64(0) + type partitionStatsMeta struct { sync.RWMutex ctx context.Context @@ -180,10 +182,23 @@ func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, par defer psm.RUnlock() if _, ok := psm.partitionStatsInfos[vChannel]; !ok { - return 0 + return emptyPartitionStatsVersion } if _, ok := psm.partitionStatsInfos[vChannel][partitionID]; !ok { - return 0 + return emptyPartitionStatsVersion } return psm.partitionStatsInfos[vChannel][partitionID].currentVersion } + +func (psm *partitionStatsMeta) GetPartitionStats(collectionID, partitionID int64, vChannel string, version int64) *datapb.PartitionStatsInfo { + psm.RLock() + defer psm.RUnlock() + + if _, ok := psm.partitionStatsInfos[vChannel]; !ok { + return nil + } + if _, ok := psm.partitionStatsInfos[vChannel][partitionID]; !ok { + return nil + } + return psm.partitionStatsInfos[vChannel][partitionID].infos[version] +} diff --git a/internal/datacoord/partition_stats_meta_test.go b/internal/datacoord/partition_stats_meta_test.go new file mode 100644 index 0000000000..904f6b3d2c --- /dev/null +++ b/internal/datacoord/partition_stats_meta_test.go @@ -0,0 +1,89 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/metastore/mocks" + "github.com/milvus-io/milvus/internal/proto/datapb" +) + +type PartitionStatsMetaSuite struct { + suite.Suite + + catalog *mocks.DataCoordCatalog + meta *partitionStatsMeta +} + +func TestPartitionStatsMetaSuite(t *testing.T) { + suite.Run(t, new(PartitionStatsMetaSuite)) +} + +func (s *PartitionStatsMetaSuite) SetupTest() { + catalog := mocks.NewDataCoordCatalog(s.T()) + catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe() + catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe() + s.catalog = catalog +} + +func (s *PartitionStatsMetaSuite) TestGetPartitionStats() { + ctx := context.Background() + partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog) + s.NoError(err) + partitionStats := []*datapb.PartitionStatsInfo{ + { + CollectionID: 1, + PartitionID: 2, + VChannel: "ch-1", + SegmentIDs: []int64{100000}, + Version: 100, + }, + } + for _, partitionStats := range partitionStats { + partitionStatsMeta.SavePartitionStatsInfo(partitionStats) + } + + ps1 := partitionStatsMeta.GetPartitionStats(1, 2, "ch-2", 100) + s.Nil(ps1) + + ps2 := partitionStatsMeta.GetPartitionStats(1, 3, "ch-1", 100) + s.Nil(ps2) + + ps3 := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 101) + s.Nil(ps3) + + ps := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 100) + s.NotNil(ps) + + currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1") + s.Equal(emptyPartitionStatsVersion, currentVersion) + + currentVersion2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-2") + s.Equal(emptyPartitionStatsVersion, currentVersion2) + + currentVersion3 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 3, "ch-1") + s.Equal(emptyPartitionStatsVersion, currentVersion3) + + partitionStatsMeta.partitionStatsInfos["ch-1"][2].currentVersion = 100 + currentVersion4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1") + s.Equal(int64(100), currentVersion4) +} diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 5a5718ae6c..3cd3fa4cd7 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -943,6 +943,7 @@ message PartitionStatsInfo { int64 version = 4; repeated int64 segmentIDs = 5; int64 analyzeTaskID = 6; + int64 commitTime = 7; } message DropCompactionPlanRequest {