diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index da89aa261d..82a408d34d 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -243,17 +243,11 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro log.Warn("compact wrong, task context done or timeout") return nil, ctx.Err() } - ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) - defer cancelAll() defer t.cleanUp(ctx) - // 1, download delta logs to build deltaMap - deltaBlobs, _, err := composePaths(t.plan.GetSegmentBinlogs()) - if err != nil { - return nil, err - } - deltaPk2Ts, err := mergeDeltalogs(ctxTimeout, t.binlogIO, deltaBlobs) - if err != nil { + // 1, decompose binlogs as preparation for later mapping + if err := binlog.DecompressCompactionBinlogs(t.plan.SegmentBinlogs); err != nil { + log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err)) return nil, err } @@ -272,7 +266,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro // 3, mapping log.Info("Clustering compaction start mapping", zap.Int("bufferNum", len(t.clusterBuffers))) - uploadSegments, partitionStats, err := t.mapping(ctx, deltaPk2Ts) + uploadSegments, partitionStats, err := t.mapping(ctx) if err != nil { log.Error("failed in mapping", zap.Error(err)) return nil, err @@ -418,7 +412,6 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e // mapping read and split input segments into buffers func (t *clusteringCompactionTask) mapping(ctx context.Context, - deltaPk2Ts map[interface{}]typeutil.Timestamp, ) ([]*datapb.CompactionSegment, *storage.PartitionStatsSnapshot, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mapping-%d", t.GetPlanID())) defer span.End() @@ -436,7 +429,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, FieldBinlogs: segment.FieldBinlogs, } future := t.mappingPool.Submit(func() (any, error) { - err := t.mappingSegment(ctx, segmentClone, deltaPk2Ts) + err := t.mappingSegment(ctx, segmentClone) return struct{}{}, err }) futures = append(futures, future) @@ -511,7 +504,6 @@ func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 { func (t *clusteringCompactionTask) mappingSegment( ctx context.Context, segment *datapb.CompactionSegmentBinlogs, - delta map[interface{}]typeutil.Timestamp, ) error { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mappingSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID())) defer span.End() @@ -528,6 +520,17 @@ func (t *clusteringCompactionTask) mappingSegment( remained int64 = 0 ) + deltaPaths := make([]string, 0) + for _, d := range segment.GetDeltalogs() { + for _, l := range d.GetBinlogs() { + deltaPaths = append(deltaPaths, l.GetLogPath()) + } + } + delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths) + if err != nil { + return err + } + isDeletedValue := func(v *storage.Value) bool { ts, ok := delta[v.PK.GetValue()] // insert task and delete task has the same ts when upsert diff --git a/internal/datanode/compaction/compactor_common.go b/internal/datanode/compaction/compactor_common.go index 23e28357c7..3839865f82 100644 --- a/internal/datanode/compaction/compactor_common.go +++ b/internal/datanode/compaction/compactor_common.go @@ -47,31 +47,25 @@ func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool { return expireTime.Before(pnow) } -func mergeDeltalogs(ctx context.Context, io io.BinlogIO, dpaths map[typeutil.UniqueID][]string) (map[interface{}]typeutil.Timestamp, error) { +func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) { pk2ts := make(map[interface{}]typeutil.Timestamp) - if len(dpaths) == 0 { - log.Info("compact with no deltalogs, skip merge deltalogs") + if len(paths) == 0 { + log.Debug("compact with no deltalogs, skip merge deltalogs") return pk2ts, nil } blobs := make([]*storage.Blob, 0) - for segID, paths := range dpaths { - if len(paths) == 0 { - continue - } - binaries, err := io.Download(ctx, paths) - if err != nil { - log.Warn("compact wrong, fail to download deltalogs", - zap.Int64("segment", segID), - zap.Strings("path", paths), - zap.Error(err)) - return nil, err - } + binaries, err := io.Download(ctx, paths) + if err != nil { + log.Warn("compact wrong, fail to download deltalogs", + zap.Strings("path", paths), + zap.Error(err)) + return nil, err + } - for i := range binaries { - blobs = append(blobs, &storage.Blob{Value: binaries[i]}) - } + for i := range binaries { + blobs = append(blobs, &storage.Blob{Value: binaries[i]}) } reader, err := storage.CreateDeltalogReader(blobs) if err != nil { @@ -104,15 +98,18 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, dpaths map[typeutil.Uni return pk2ts, nil } -func composePaths(segments []*datapb.CompactionSegmentBinlogs) (map[typeutil.UniqueID][]string, [][]string, error) { +func composePaths(segments []*datapb.CompactionSegmentBinlogs) ( + deltaPaths map[typeutil.UniqueID][]string, insertPaths map[typeutil.UniqueID][]string, err error, +) { if err := binlog.DecompressCompactionBinlogs(segments); err != nil { log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err)) return nil, nil, err } - deltaPaths := make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths - allPath := make([][]string, 0) // group by binlog batch + deltaPaths = make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths + insertPaths = make(map[typeutil.UniqueID][]string, 0) // segmentID to binlog paths for _, s := range segments { + segId := s.GetSegmentID() // Get the batch count of field binlog files from non-empty segment // each segment might contain different batches var binlogBatchCount int @@ -132,17 +129,17 @@ func composePaths(segments []*datapb.CompactionSegmentBinlogs) (map[typeutil.Uni for _, f := range s.GetFieldBinlogs() { batchPaths = append(batchPaths, f.GetBinlogs()[idx].GetLogPath()) } - allPath = append(allPath, batchPaths) + insertPaths[segId] = append(insertPaths[segId], batchPaths...) } deltaPaths[s.GetSegmentID()] = []string{} for _, d := range s.GetDeltalogs() { for _, l := range d.GetBinlogs() { - deltaPaths[s.GetSegmentID()] = append(deltaPaths[s.GetSegmentID()], l.GetLogPath()) + deltaPaths[segId] = append(deltaPaths[s.GetSegmentID()], l.GetLogPath()) } } } - return deltaPaths, allPath, nil + return deltaPaths, insertPaths, nil } func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) { diff --git a/internal/datanode/compaction/merge_sort.go b/internal/datanode/compaction/merge_sort.go index c0c3da817e..bcf8dd8bb7 100644 --- a/internal/datanode/compaction/merge_sort.go +++ b/internal/datanode/compaction/merge_sort.go @@ -24,7 +24,6 @@ func mergeSortMultipleSegments(ctx context.Context, collectionID, partitionID, maxRows int64, binlogIO io.BinlogIO, binlogs []*datapb.CompactionSegmentBinlogs, - delta map[interface{}]typeutil.Timestamp, tr *timerecord.TimeRecorder, currentTs typeutil.Timestamp, collectionTtl int64, @@ -47,17 +46,6 @@ func mergeSortMultipleSegments(ctx context.Context, deletedRowCount int64 ) - isValueDeleted := func(v *storage.Value) bool { - ts, ok := delta[v.PK.GetValue()] - // insert task and delete task has the same ts when upsert - // here should be < instead of <= - // to avoid the upsert data to be deleted after compact - if ok && uint64(v.Timestamp) < ts { - return true - } - return false - } - pkField, err := typeutil.GetPrimaryFieldSchema(plan.GetSchema()) if err != nil { log.Warn("failed to get pk field from schema") @@ -66,6 +54,7 @@ func mergeSortMultipleSegments(ctx context.Context, // SegmentDeserializeReaderTest(binlogPaths, t.binlogIO, writer.GetPkID()) segmentReaders := make([]*SegmentDeserializeReader, len(binlogs)) + segmentDelta := make([]map[interface{}]storage.Timestamp, len(binlogs)) for i, s := range binlogs { var binlogBatchCount int for _, b := range s.GetFieldBinlogs() { @@ -89,13 +78,42 @@ func mergeSortMultipleSegments(ctx context.Context, binlogPaths[idx] = batchPaths } segmentReaders[i] = NewSegmentDeserializeReader(ctx, binlogPaths, binlogIO, pkField.GetFieldID(), bm25FieldIds) + deltalogPaths := make([]string, 0) + for _, d := range s.GetDeltalogs() { + for _, l := range d.GetBinlogs() { + deltalogPaths = append(deltalogPaths, l.GetLogPath()) + } + } + segmentDelta[i], err = mergeDeltalogs(ctx, binlogIO, deltalogPaths) + if err != nil { + return nil, err + } + } + + advanceRow := func(i int) (*storage.Value, error) { + for { + v, err := segmentReaders[i].Next() + if err != nil { + return nil, err + } + + ts, ok := segmentDelta[i][v.PK.GetValue()] + // insert task and delete task has the same ts when upsert + // here should be < instead of <= + // to avoid the upsert data to be deleted after compact + if ok && uint64(v.Timestamp) < ts { + deletedRowCount++ + continue + } + return v, nil + } } pq := make(PriorityQueue, 0) heap.Init(&pq) - for i, r := range segmentReaders { - if v, err := r.Next(); err == nil { + for i := range segmentReaders { + if v, err := advanceRow(i); err == nil { heap.Push(&pq, &PQItem{ Value: v, Index: i, @@ -107,11 +125,6 @@ func mergeSortMultipleSegments(ctx context.Context, smallest := heap.Pop(&pq).(*PQItem) v := smallest.Value - if isValueDeleted(v) { - deletedRowCount++ - continue - } - // Filtering expired entity if isExpiredEntity(collectionTtl, currentTs, typeutil.Timestamp(v.Timestamp)) { expiredRowCount++ @@ -124,7 +137,7 @@ func mergeSortMultipleSegments(ctx context.Context, return nil, err } - iv, err := segmentReaders[smallest.Index].Next() + iv, err := advanceRow(smallest.Index) if err != nil && err != sio.EOF { return nil, err } diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 08e50da608..a12ef12ede 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -130,8 +130,8 @@ func (t *mixCompactionTask) preCompact() error { func (t *mixCompactionTask) mergeSplit( ctx context.Context, - binlogPaths [][]string, - delta map[interface{}]typeutil.Timestamp, + insertPaths map[int64][]string, + deltaPaths map[int64][]string, ) ([]*datapb.CompactionSegment, error) { _ = t.tr.RecordSpan() @@ -153,8 +153,9 @@ func (t *mixCompactionTask) mergeSplit( log.Warn("failed to get pk field from schema") return nil, err } - for _, paths := range binlogPaths { - del, exp, err := t.writePaths(ctx, delta, mWriter, pkField, paths) + for segId, binlogPaths := range insertPaths { + deltaPaths := deltaPaths[segId] + del, exp, err := t.writeSegment(ctx, binlogPaths, deltaPaths, mWriter, pkField) if err != nil { return nil, err } @@ -177,31 +178,38 @@ func (t *mixCompactionTask) mergeSplit( return res, nil } -func isValueDeleted(v *storage.Value, delta map[interface{}]typeutil.Timestamp) bool { - ts, ok := delta[v.PK.GetValue()] - // insert task and delete task has the same ts when upsert - // here should be < instead of <= - // to avoid the upsert data to be deleted after compact - if ok && uint64(v.Timestamp) < ts { - return true - } - return false -} - -func (t *mixCompactionTask) writePaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp, - mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string, +func (t *mixCompactionTask) writeSegment(ctx context.Context, + binlogPaths []string, + deltaPaths []string, + mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, ) (deletedRowCount, expiredRowCount int64, err error) { - log := log.With(zap.Strings("paths", paths)) - allValues, err := t.binlogIO.Download(ctx, paths) + log := log.With(zap.Strings("paths", binlogPaths)) + allValues, err := t.binlogIO.Download(ctx, binlogPaths) if err != nil { log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) return } blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { - return &storage.Blob{Key: paths[i], Value: v} + return &storage.Blob{Key: binlogPaths[i], Value: v} }) + delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths) + if err != nil { + log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err)) + return + } + isValueDeleted := func(v *storage.Value) bool { + ts, ok := delta[v.PK.GetValue()] + // insert task and delete task has the same ts when upsert + // here should be < instead of <= + // to avoid the upsert data to be deleted after compact + if ok && uint64(v.Timestamp) < ts { + return true + } + return false + } + iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID()) if err != nil { log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err)) @@ -221,7 +229,8 @@ func (t *mixCompactionTask) writePaths(ctx context.Context, delta map[interface{ } } v := iter.Value() - if isValueDeleted(v, delta) { + + if isValueDeleted(v) { deletedRowCount++ continue } @@ -261,23 +270,24 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { defer cancelAll() log.Info("compact start") - deltaPaths, allBatchPaths, err := composePaths(t.plan.GetSegmentBinlogs()) + deltaPaths, insertPaths, err := composePaths(t.plan.GetSegmentBinlogs()) if err != nil { log.Warn("compact wrong, failed to composePaths", zap.Error(err)) return nil, err } // Unable to deal with all empty segments cases, so return error - if len(allBatchPaths) == 0 { + isEmpty := true + for _, paths := range insertPaths { + if len(paths) > 0 { + isEmpty = false + break + } + } + if isEmpty { log.Warn("compact wrong, all segments' binlogs are empty") return nil, errors.New("illegal compaction plan") } - deltaPk2Ts, err := mergeDeltalogs(ctxTimeout, t.binlogIO, deltaPaths) - if err != nil { - log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err)) - return nil, err - } - allSorted := true for _, segment := range t.plan.GetSegmentBinlogs() { if !segment.GetIsSorted() { @@ -290,13 +300,13 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { if allSorted && len(t.plan.GetSegmentBinlogs()) > 1 { log.Info("all segments are sorted, use merge sort") res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO, - t.plan.GetSegmentBinlogs(), deltaPk2Ts, t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs) + t.plan.GetSegmentBinlogs(), t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs) if err != nil { log.Warn("compact wrong, fail to merge sort segments", zap.Error(err)) return nil, err } } else { - res, err = t.mergeSplit(ctxTimeout, allBatchPaths, deltaPk2Ts) + res, err = t.mergeSplit(ctxTimeout, insertPaths, deltaPaths) if err != nil { log.Warn("compact wrong, failed to mergeSplit", zap.Error(err)) return nil, err diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 5e98d2be0a..51cfee8fa4 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -18,6 +18,7 @@ package compaction import ( "context" + "fmt" "math" "testing" "time" @@ -286,6 +287,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() { alloc := allocator.NewLocalAllocator(100, math.MaxInt64) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) + deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0) for _, segID := range segments { s.initMultiRowsSegBuffer(segID, 100, 3) kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) @@ -295,11 +297,25 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() { return len(left) == 0 && len(right) == 0 })).Return(lo.Values(kvs), nil).Once() + blob, err := getInt64DeltaBlobs( + segID, + []int64{segID, segID + 3, segID + 6}, + []uint64{deleteTs, deleteTs, deleteTs}, + ) + s.Require().NoError(err) + deltaPath := fmt.Sprintf("deltalog/%d", segID) + s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{deltaPath}). + Return([][]byte{blob.GetValue()}, nil).Once() + s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: segID, FieldBinlogs: lo.Values(fBinlogs), IsSorted: true, + Deltalogs: []*datapb.FieldBinlog{ + {Binlogs: []*datapb.Binlog{{LogPath: deltaPath}}}, + }, }) + } result, err := s.task.Compact() @@ -312,7 +328,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() { segment := result.GetSegments()[0] s.EqualValues(19531, segment.GetSegmentID()) - s.EqualValues(300, segment.GetNumOfRows()) + s.EqualValues(291, segment.GetNumOfRows()) s.NotEmpty(segment.InsertLogs) s.NotEmpty(segment.Field2StatslogPaths) @@ -340,7 +356,7 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { s.task.partitionID = PartitionID s.task.maxRows = 1000 - compactionSegments, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, nil) + compactionSegments, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: lo.Keys(kvs)}, nil) s.NoError(err) s.Equal(1, len(compactionSegments)) s.EqualValues(0, compactionSegments[0].GetNumOfRows()) @@ -355,31 +371,45 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0) tests := []struct { description string - deletions map[interface{}]uint64 + deletions map[int64]uint64 expectedRes int leftNumRows int }{ {"no deletion", nil, 1, 1}, - {"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1, 1}, - {"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 1, 0}, + {"mismatch deletion", map[int64]uint64{int64(1): deleteTs}, 1, 1}, + {"deleted pk=4", map[int64]uint64{int64(4): deleteTs}, 1, 0}, } alloc := allocator.NewLocalAllocator(888888, math.MaxInt64) kvs, _, err := serializeWrite(context.TODO(), alloc, s.segWriter) + insertPaths := lo.Keys(kvs) s.Require().NoError(err) for _, test := range tests { s.Run(test.description, func() { - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn( + s.mockBinlogIO.EXPECT().Download(mock.Anything, insertPaths).RunAndReturn( func(ctx context.Context, paths []string) ([][]byte, error) { s.Require().Equal(len(paths), len(kvs)) return lo.Values(kvs), nil }) + deletePaths := make(map[int64][]string, 0) + if len(test.deletions) > 0 { + blob, err := getInt64DeltaBlobs( + s.segWriter.segmentID, + lo.Keys(test.deletions), + lo.Values(test.deletions), + ) + s.Require().NoError(err) + s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"foo"}). + Return([][]byte{blob.GetValue()}, nil).Once() + deletePaths[s.segWriter.segmentID] = []string{"foo"} + } + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe() s.task.collectionID = CollectionID s.task.partitionID = PartitionID s.task.maxRows = 1000 - res, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, test.deletions) + res, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: insertPaths}, deletePaths) s.NoError(err) s.EqualValues(test.expectedRes, len(res)) s.EqualValues(test.leftNumRows, res[0].GetNumOfRows()) @@ -478,7 +508,7 @@ func (s *MixCompactionTaskSuite) TestMergeDeltalogsMultiSegment() { s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything). Return(dValues, nil) - got, err := mergeDeltalogs(s.task.ctx, s.task.binlogIO, map[int64][]string{100: {"random"}}) + got, err := mergeDeltalogs(s.task.ctx, s.task.binlogIO, []string{"random"}) s.NoError(err) s.Equal(len(test.expectedpk2ts), len(got)) @@ -508,12 +538,12 @@ func (s *MixCompactionTaskSuite) TestMergeDeltalogsOneSegment() { s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"mock_error"}). Return(nil, errors.New("mock_error")).Once() - invalidPaths := map[int64][]string{2000: {"mock_error"}} + invalidPaths := []string{"mock_error"} got, err := mergeDeltalogs(s.task.ctx, s.task.binlogIO, invalidPaths) s.Error(err) s.Nil(got) - dpaths := map[int64][]string{1000: {"a"}} + dpaths := []string{"a"} got, err = mergeDeltalogs(s.task.ctx, s.task.binlogIO, dpaths) s.NoError(err) s.NotNil(got)