mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 03:18:29 +08:00
Fix newDataSyncService panicking with normal segment has no dml pos (#12771)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
b16bbb1ed2
commit
b20a238c38
@ -169,8 +169,14 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
||||
zap.Int64("SegmentID", us.GetID()),
|
||||
zap.Int64("NumOfRows", us.GetNumOfRows()),
|
||||
)
|
||||
|
||||
if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), us.GetNumOfRows(), us.Statslogs, &segmentCheckPoint{us.GetNumOfRows(), *us.GetDmlPosition()}); err != nil {
|
||||
var cp *segmentCheckPoint
|
||||
if us.GetDmlPosition() != nil {
|
||||
cp = &segmentCheckPoint{
|
||||
numRows: us.GetNumOfRows(),
|
||||
pos: *us.GetDmlPosition(),
|
||||
}
|
||||
}
|
||||
if err := dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), us.GetNumOfRows(), us.Statslogs, cp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -215,7 +215,7 @@ func (replica *SegmentReplica) getCollectionAndPartitionID(segID UniqueID) (coll
|
||||
return seg.collectionID, seg.partitionID, nil
|
||||
}
|
||||
|
||||
return 0, 0, fmt.Errorf("Cannot find segment, id = %v", segID)
|
||||
return 0, 0, fmt.Errorf("cannot find segment, id = %v", segID)
|
||||
}
|
||||
|
||||
// addNewSegment adds a *New* and *NotFlushed* new segment. Before add, please make sure there's no
|
||||
@ -313,13 +313,14 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
|
||||
channelName: channelName,
|
||||
numRows: numOfRows,
|
||||
|
||||
checkPoint: *cp,
|
||||
endPos: &cp.pos,
|
||||
|
||||
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
|
||||
minPK: math.MaxInt64, // use max value, represents no value
|
||||
maxPK: math.MinInt64, // use min value represents no value
|
||||
}
|
||||
if cp != nil {
|
||||
seg.checkPoint = *cp
|
||||
seg.endPos = &cp.pos
|
||||
}
|
||||
err := replica.initPKBloomFilter(seg, statsBinlogs)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -343,7 +344,7 @@ func (replica *SegmentReplica) addFlushedSegment(segID, collID, partitionID Uniq
|
||||
log.Warn("Mismatch collection",
|
||||
zap.Int64("input ID", collID),
|
||||
zap.Int64("expected ID", replica.collectionID))
|
||||
return fmt.Errorf("Mismatch collection, ID=%d", collID)
|
||||
return fmt.Errorf("mismatch collection, ID=%d", collID)
|
||||
}
|
||||
|
||||
log.Debug("Add Flushed segment",
|
||||
|
@ -367,6 +367,18 @@ func TestSegmentReplica_InterfaceMethod(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Test_addNormalSegmentWithNilDml", func(t *testing.T) {
|
||||
sr, err := newReplica(context.Background(), rc, 1)
|
||||
require.NoError(t, err)
|
||||
sr.minIOKV = &mockMinioKV{}
|
||||
segID := int64(101)
|
||||
require.False(t, sr.hasSegment(segID, true))
|
||||
assert.NotPanics(t, func() {
|
||||
err = sr.addNormalSegment(segID, 1, 10, "empty_dml_chan", 0, []*datapb.FieldBinlog{}, nil)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Test_listSegmentsCheckPoints", func(t *testing.T) {
|
||||
tests := []struct {
|
||||
newSegID UniqueID
|
||||
|
Loading…
Reference in New Issue
Block a user