fix double messages bug in datanode recovery (#5733)

* fix double messages bug in datanode recovery

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

* remove debug log

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-06-11 09:24:52 +08:00 committed by zhenshan.cao
parent ac9dde7352
commit ab2fd34a2f
4 changed files with 26 additions and 8 deletions

View File

@ -161,6 +161,11 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
var alloc allocatorInterface = newAllocator(node.masterService)
log.Debug("Received Vchannel Info",
zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())),
zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())),
)
flushChan := make(chan *flushMsg, 100)
dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataService)
node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService

View File

@ -164,9 +164,20 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) {
for _, us := range vchanInfo.GetUnflushedSegments() {
if us.CollectionID != dsService.collectionID ||
us.GetInsertChannel() != vchanInfo.ChannelName {
log.Warn("Collection ID or ChannelName not compact",
zap.Int64("Wanted ID", dsService.collectionID),
zap.Int64("Actual ID", us.CollectionID),
zap.String("Wanted Channel Name", vchanInfo.ChannelName),
zap.String("Actual Channel Name", us.GetInsertChannel()),
)
continue
}
log.Info("Recover Segment NumOfRows form checkpoints",
zap.String("InsertChannel", us.GetInsertChannel()),
zap.Int64("SegmentID", us.GetID()),
zap.Int64("NumOfRows", us.GetNumOfRows()),
)
dsService.replica.addSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel())
dsService.replica.updateStatistics(us.GetID(), us.GetNumOfRows())
}

View File

@ -86,9 +86,8 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
zap.Uint64("Message endts", msg.EndTs()),
zap.Uint64("FilterThreshold", FilterThreshold),
)
resMsg := ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg))
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
if ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) {
continue
}
}
@ -104,21 +103,21 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
return []Msg{res}
}
func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) bool {
if ddn.isFlushed(msg.GetSegmentID()) {
return nil
return true
}
ddn.mu.Lock()
if si, ok := ddn.seg2SegInfo[msg.GetSegmentID()]; ok {
if msg.EndTs() > si.GetDmlPosition().GetTimestamp() {
delete(ddn.seg2SegInfo, msg.GetSegmentID())
return nil
return true
}
}
ddn.mu.Unlock()
return msg
return false
}
func (ddn *ddNode) isFlushed(segmentID UniqueID) bool {

View File

@ -152,7 +152,10 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
collID := msg.GetCollectionID()
partitionID := msg.GetPartitionID()
log.Debug("InsertBufferNode Operating Segment", zap.Int64("ID", currentSegID))
// log.Debug("InsertBufferNode Operating Segment",
// zap.Int64("ID", currentSegID),
// zap.Int("NumOfRows", len(msg.RowIDs)),
// )
if !ibNode.replica.hasSegment(currentSegID) {
err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID())