mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Make the sync segment request idempotent in the data node (#20707)
Signed-off-by: SimFG <bang.fu@zilliz.com> Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
6d9d24b4ca
commit
44d45452fa
@ -23,6 +23,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
@ -539,18 +541,14 @@ func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compac
|
||||
return fmt.Errorf("mismatch collection, ID=%d", seg.collectionID)
|
||||
}
|
||||
|
||||
var inValidSegments []UniqueID
|
||||
for _, ID := range compactedFrom {
|
||||
// no such segments in channel or the segments are unflushed.
|
||||
if !c.hasSegment(ID, true) || c.hasSegment(ID, false) {
|
||||
inValidSegments = append(inValidSegments, ID)
|
||||
compactedFrom = lo.Filter[int64](compactedFrom, func(segID int64, _ int) bool {
|
||||
// which means the segment is the `flushed` state
|
||||
has := c.hasSegment(segID, true) && !c.hasSegment(segID, false)
|
||||
if !has {
|
||||
log.Warn("invalid segment", zap.Int64("segment_id", segID))
|
||||
}
|
||||
}
|
||||
|
||||
if len(inValidSegments) > 0 {
|
||||
log.Warn("no match flushed segments to merge from", zap.Int64s("invalid segmentIDs", inValidSegments))
|
||||
return fmt.Errorf("invalid compactedFrom segments: %v", inValidSegments)
|
||||
}
|
||||
return has
|
||||
})
|
||||
|
||||
log.Info("merge flushed segments")
|
||||
c.segMu.Lock()
|
||||
|
@ -609,27 +609,29 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
|
||||
stored bool
|
||||
|
||||
inCompactedFrom []UniqueID
|
||||
expectedFrom []UniqueID
|
||||
inSeg *Segment
|
||||
}{
|
||||
{"mismatch collection", false, false, []UniqueID{1, 2}, &Segment{
|
||||
{"mismatch collection", false, false, []UniqueID{1, 2}, []UniqueID{1, 2}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: -1,
|
||||
}},
|
||||
{"no match flushed segment", false, false, []UniqueID{1, 6}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
}},
|
||||
{"numRows==0", true, false, []UniqueID{1, 2}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 0,
|
||||
}},
|
||||
{"numRows>0", true, true, []UniqueID{1, 2}, &Segment{
|
||||
{"no match flushed segment", true, true, []UniqueID{1, 6}, []UniqueID{1}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 15,
|
||||
}},
|
||||
{"segment exists but not flushed", false, false, []UniqueID{1, 4}, &Segment{
|
||||
{"numRows==0", true, false, []UniqueID{1, 2}, []UniqueID{1, 2}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 0,
|
||||
}},
|
||||
{"numRows>0", true, true, []UniqueID{1, 2}, []UniqueID{1, 2}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 15,
|
||||
}},
|
||||
{"segment exists but not flushed", true, true, []UniqueID{1, 4}, []UniqueID{1}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 15,
|
||||
@ -682,7 +684,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
|
||||
|
||||
from, ok := to2from[3]
|
||||
assert.True(t, ok)
|
||||
assert.ElementsMatch(t, []UniqueID{1, 2}, from)
|
||||
assert.ElementsMatch(t, test.expectedFrom, from)
|
||||
} else {
|
||||
assert.False(t, channel.hasSegment(3, true))
|
||||
}
|
||||
|
@ -913,28 +913,27 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
||||
return status, nil
|
||||
}
|
||||
|
||||
oneSegment := req.GetCompactedFrom()[0]
|
||||
channel, err := node.flowgraphManager.getChannel(oneSegment)
|
||||
if err != nil {
|
||||
status.Reason = fmt.Sprintf("invalid request, err=%s", err.Error())
|
||||
getChannel := func() (int64, Channel) {
|
||||
for _, segmentFrom := range req.GetCompactedFrom() {
|
||||
channel, err := node.flowgraphManager.getChannel(segmentFrom)
|
||||
if err != nil {
|
||||
log.Warn("invalid segmentID", zap.Int64("segment_from", segmentFrom), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
return segmentFrom, channel
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
oneSegment, channel := getChannel()
|
||||
if channel == nil {
|
||||
log.Warn("no available channel")
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
return status, nil
|
||||
}
|
||||
|
||||
ds, ok := node.flowgraphManager.getFlowgraphService(channel.getChannelName(oneSegment))
|
||||
if !ok {
|
||||
status.Reason = fmt.Sprintf("failed to find flow graph service, err=%s", err.Error())
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// check if all compactedFrom segments are valid
|
||||
var invalidSegIDs []UniqueID
|
||||
for _, segID := range req.GetCompactedFrom() {
|
||||
if !channel.hasSegment(segID, true) {
|
||||
invalidSegIDs = append(invalidSegIDs, segID)
|
||||
}
|
||||
}
|
||||
if len(invalidSegIDs) > 0 {
|
||||
status.Reason = fmt.Sprintf("invalid request, some segments are not in the same channel: %v", invalidSegIDs)
|
||||
status.Reason = fmt.Sprintf("failed to find flow graph service, segmentID: %d", oneSegment)
|
||||
return status, nil
|
||||
}
|
||||
|
||||
@ -947,7 +946,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
||||
numRows: req.GetNumOfRows(),
|
||||
}
|
||||
|
||||
err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
|
||||
err := channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
|
||||
if err != nil {
|
||||
status.Reason = fmt.Sprintf("init pk stats fail, err=%s", err.Error())
|
||||
return status, nil
|
||||
|
@ -668,7 +668,7 @@ func TestDataNode(t *testing.T) {
|
||||
err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{
|
||||
ChannelName: chanName,
|
||||
UnflushedSegmentIds: []int64{},
|
||||
FlushedSegmentIds: []int64{},
|
||||
FlushedSegmentIds: []int64{100, 200, 300},
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
fg, ok := node.flowgraphManager.getFlowgraphService(chanName)
|
||||
@ -687,24 +687,26 @@ func TestDataNode(t *testing.T) {
|
||||
}
|
||||
|
||||
t.Run("invalid compacted from", func(t *testing.T) {
|
||||
invalidCompactedFroms := [][]UniqueID{
|
||||
{},
|
||||
{101, 201},
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
CompactedTo: 400,
|
||||
NumOfRows: 100,
|
||||
}
|
||||
req := &datapb.SyncSegmentsRequest{}
|
||||
|
||||
for _, invalid := range invalidCompactedFroms {
|
||||
req.CompactedFrom = invalid
|
||||
status, err := node.SyncSegments(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
}
|
||||
req.CompactedFrom = []UniqueID{}
|
||||
status, err := node.SyncSegments(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
|
||||
req.CompactedFrom = []UniqueID{101, 201}
|
||||
status, err = node.SyncSegments(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("valid request numRows>0", func(t *testing.T) {
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
CompactedFrom: []int64{100, 200},
|
||||
CompactedTo: 101,
|
||||
CompactedFrom: []UniqueID{100, 200, 101, 201},
|
||||
CompactedTo: 102,
|
||||
NumOfRows: 100,
|
||||
}
|
||||
status, err := node.SyncSegments(ctx, req)
|
||||
|
Loading…
Reference in New Issue
Block a user