From b20a238c389e25ab7b5eca76bcd5f9cb2733c91e Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 6 Dec 2021 16:21:35 +0800 Subject: [PATCH] Fix newDataSyncService panicking with normal segment has no dml pos (#12771) Signed-off-by: Congqi Xia --- internal/datanode/data_sync_service.go | 10 ++++++++-- internal/datanode/segment_replica.go | 11 ++++++----- internal/datanode/segment_replica_test.go | 12 ++++++++++++ 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 5d6c71c3b3..90f8f1e7e7 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -169,8 +169,14 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro zap.Int64("SegmentID", us.GetID()), zap.Int64("NumOfRows", us.GetNumOfRows()), ) - - if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), us.GetNumOfRows(), us.Statslogs, &segmentCheckPoint{us.GetNumOfRows(), *us.GetDmlPosition()}); err != nil { + var cp *segmentCheckPoint + if us.GetDmlPosition() != nil { + cp = &segmentCheckPoint{ + numRows: us.GetNumOfRows(), + pos: *us.GetDmlPosition(), + } + } + if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), us.GetNumOfRows(), us.Statslogs, cp); err != nil { return err } } diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 84524ca170..8dd7705aae 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -215,7 +215,7 @@ func (replica *SegmentReplica) getCollectionAndPartitionID(segID UniqueID) (coll return seg.collectionID, seg.partitionID, nil } - return 0, 0, fmt.Errorf("Cannot find segment, id = %v", segID) + return 0, 0, fmt.Errorf("cannot find segment, id = %v", segID) } // addNewSegment adds a *New* and *NotFlushed* new segment. Before add, please make sure there's no @@ -313,13 +313,14 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu channelName: channelName, numRows: numOfRows, - checkPoint: *cp, - endPos: &cp.pos, - pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive), minPK: math.MaxInt64, // use max value, represents no value maxPK: math.MinInt64, // use min value represents no value } + if cp != nil { + seg.checkPoint = *cp + seg.endPos = &cp.pos + } err := replica.initPKBloomFilter(seg, statsBinlogs) if err != nil { return err @@ -343,7 +344,7 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq log.Warn("Mismatch collection", zap.Int64("input ID", collID), zap.Int64("expected ID", replica.collectionID)) - return fmt.Errorf("Mismatch collection, ID=%d", collID) + return fmt.Errorf("mismatch collection, ID=%d", collID) } log.Debug("Add Flushed segment", diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go index d5a4e2dbbb..9c7566be88 100644 --- a/internal/datanode/segment_replica_test.go +++ b/internal/datanode/segment_replica_test.go @@ -367,6 +367,18 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) { } }) + t.Run("Test_addNormalSegmentWithNilDml", func(t *testing.T) { + sr, err := newReplica(context.Background(), rc, 1) + require.NoError(t, err) + sr.minIOKV = &mockMinioKV{} + segID := int64(101) + require.False(t, sr.hasSegment(segID, true)) + assert.NotPanics(t, func() { + err = sr.addNormalSegment(segID, 1, 10, "empty_dml_chan", 0, []*datapb.FieldBinlog{}, nil) + assert.NoError(t, err) + }) + }) + t.Run("Test_listSegmentsCheckPoints", func(t *testing.T) { tests := []struct { newSegID UniqueID