Handle errors in DataNode and QueryNode flow graph (#17096)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2022-05-24 21:11:59 +08:00 committed by GitHub
parent ffaead6ad9
commit a4ea2fb18a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 947 additions and 450 deletions

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"fmt"
"math"
"testing"
"time"
@ -862,15 +863,22 @@ func TestCompactorInterfaceMethods(t *testing.T) {
type mockFlushManager struct {
sleepSeconds int32
returnError bool
}
var _ flushManager = (*mockFlushManager)(nil)
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *internalpb.MsgPosition) error {
if mfm.returnError {
return fmt.Errorf("mock error")
}
return nil
}
func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error {
if mfm.returnError {
return fmt.Errorf("mock error")
}
return nil
}

View File

@ -19,9 +19,13 @@ package datanode
import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
@ -30,9 +34,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
// make sure ddNode implements flowgraph.Node
@ -55,6 +58,7 @@ var _ flowgraph.Node = (*ddNode)(nil)
type ddNode struct {
BaseNode
ctx context.Context
collectionID UniqueID
segID2SegInfo sync.Map // segment ID to *SegmentInfo
@ -81,7 +85,11 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
msMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("Type assertion failed for MsgStreamMsg")
if in[0] == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil")
} else {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
@ -163,12 +171,13 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg)
}
}
err := ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax())
err := retry.Do(ddn.ctx, func() error {
return ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax())
}, flowGraphRetryOpt)
if err != nil {
// TODO: proper deal with error
log.Warn("DDNode forward delete msg failed",
zap.String("vChannelName", ddn.vchannelName),
zap.Error(err))
err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vchannelName, err)
log.Error(err.Error())
panic(err)
}
fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...)
@ -301,6 +310,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI
deltaMsgStream.Start()
dd := &ddNode{
ctx: ctx,
BaseNode: baseNode,
collectionID: collID,
flushedSegments: fs,

View File

@ -21,6 +21,8 @@ import (
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -148,7 +150,10 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"})
ddn := ddNode{
ctx: context.Background(),
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
vchannelName: "ddn_drop_msg",
@ -208,8 +213,11 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"})
// Prepare ddNode states
ddn := ddNode{
ctx: context.Background(),
flushedSegments: []*datapb.SegmentInfo{fs},
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
@ -254,15 +262,21 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"})
// Prepare ddNode states
ddn := ddNode{
ctx: context.Background(),
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
}
// Prepare delete messages
var dMsg msgstream.TsMsg = &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{EndTimestamp: test.MsgEndTs},
BaseMsg: msgstream.BaseMsg{
EndTimestamp: test.MsgEndTs,
HashValues: []uint32{0},
},
DeleteRequest: internalpb.DeleteRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete},
CollectionID: test.inMsgCollID,
@ -277,6 +291,39 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
})
}
})
to.Run("Test forwardDeleteMsg failed", func(te *testing.T) {
factory := dependency.NewDefaultFactory(true)
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(to, err)
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
// Prepare ddNode states
ddn := ddNode{
ctx: context.Background(),
collectionID: 1,
deltaMsgStream: deltaStream,
}
// Prepare delete messages
var dMsg msgstream.TsMsg = &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
EndTimestamp: 2000,
HashValues: []uint32{0},
},
DeleteRequest: internalpb.DeleteRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete},
CollectionID: 1,
},
}
tsMessages := []msgstream.TsMsg{dMsg}
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
// Test
flowGraphRetryOpt = retry.Attempts(1)
assert.Panics(te, func() {
ddn.Operate([]Msg{msgStreamMsg})
})
})
}
func TestFlowGraph_DDNode_filterMessages(te *testing.T) {

View File

@ -20,8 +20,10 @@ import (
"context"
"fmt"
"math"
"reflect"
"sync"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
@ -31,8 +33,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
)
type (
@ -43,6 +45,8 @@ type (
// DeleteNode is to process delete msg, flush delete info into storage.
type deleteNode struct {
BaseNode
ctx context.Context
channelName string
delBuf sync.Map // map[segmentID]*DelDataBuf
replica Replica
@ -129,9 +133,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
rows := len(pks)
tss, ok := segIDToTss[segID]
if !ok || rows != len(tss) {
// TODO: what's the expected behavior after this Error?
log.Error("primary keys and timestamp's element num mis-match")
continue
return fmt.Errorf("primary keys and timestamp's element num mis-match, segmentID = %d", segID)
}
var delDataBuf *DelDataBuf
@ -202,8 +204,12 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
log.Warn("type assertion failed for flowGraphMsg")
return nil
if in[0] == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
} else {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
var spans []opentracing.Span
@ -217,8 +223,12 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID))
if err := dn.bufferDeleteMsg(msg, fgMsg.timeRange); err != nil {
log.Error("buffer delete msg failed", zap.Error(err))
err := dn.bufferDeleteMsg(msg, fgMsg.timeRange)
if err != nil {
// error occurs only when deleteMsg is misaligned, should not happen
err = fmt.Errorf("buffer delete msg failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
}
@ -238,13 +248,16 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
// no related delta data to flush, send empty buf to complete flush life-cycle
dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0])
} else {
err := dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0])
err := retry.Do(dn.ctx, func() error {
return dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0])
}, flowGraphRetryOpt)
if err != nil {
log.Warn("Failed to flush delete data", zap.Error(err))
} else {
// remove delete buf
dn.delBuf.Delete(segmentToFlush)
err = fmt.Errorf("failed to flush delete data, err = %s", err)
log.Error(err.Error())
panic(err)
}
// remove delete buf
dn.delBuf.Delete(segmentToFlush)
}
}
}
@ -301,6 +314,7 @@ func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- string, conf
baseNode.SetMaxParallelism(config.maxParallelism)
return &deleteNode{
ctx: ctx,
BaseNode: baseNode,
delBuf: sync.Map{},

View File

@ -22,6 +22,8 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/bits-and-blooms/bloom/v3"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/mq/msgstream"
@ -207,6 +209,8 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
"Invalid input length == 3"},
{[]Msg{&flowGraphMsg{}},
"Invalid input length == 1 but input message is not msgStreamMsg"},
{[]Msg{&flowgraph.MsgStreamMsg{}},
"Invalid input length == 1 but input message is not flowGraphMsg"},
}
for _, test := range invalidInTests {
@ -388,4 +392,37 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
case <-sig:
}
})
t.Run("Test deleteNode Operate flushDelData failed", func(te *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
chanName := "datanode-test-FlowGraphDeletenode-operate"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.EtcdCfg.MetaRootPath = testPath
Params.DataNodeCfg.DeleteBinlogRootPath = testPath
c := &nodeConfig{
replica: &mockReplica{},
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
msg.segmentsToFlush = []UniqueID{-1}
delNode.delBuf.Store(UniqueID(-1), &DelDataBuf{})
delNode.flushManager = &mockFlushManager{
returnError: true,
}
var fgMsg flowgraph.Msg = &msg
flowGraphRetryOpt = retry.Attempts(1)
assert.Panics(te, func() {
delNode.Operate([]flowgraph.Msg{fgMsg})
})
})
}

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"github.com/golang/protobuf/proto"
@ -36,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
@ -50,6 +52,8 @@ type (
type insertBufferNode struct {
BaseNode
ctx context.Context
channelName string
insertBuffer sync.Map // SegmentID to BufferData
replica Replica
@ -161,8 +165,11 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
log.Warn("type assertion failed for flowGraphMsg")
ibNode.Close()
if in[0] == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
} else {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
@ -192,12 +199,11 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
}
if startPositions[0].Timestamp < ibNode.lastTimestamp {
log.Error("insert buffer node consumed old messages",
zap.String("channel", ibNode.channelName),
zap.Any("timestamp", startPositions[0].Timestamp),
zap.Any("lastTimestamp", ibNode.lastTimestamp),
)
return []Msg{}
// message stream should guarantee that this should not happen
err := fmt.Errorf("insert buffer node consumed old messages, channel = %s, timestamp = %d, lastTimestamp = %d",
ibNode.channelName, startPositions[0].Timestamp, ibNode.lastTimestamp)
log.Error(err.Error())
panic(err)
}
ibNode.lastTimestamp = endPositions[0].Timestamp
@ -205,15 +211,20 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
// Updating segment statistics in replica
seg2Upload, err := ibNode.updateSegStatesInReplica(fgMsg.insertMessages, startPositions[0], endPositions[0])
if err != nil {
log.Warn("update segment states in Replica wrong", zap.Error(err))
return []Msg{}
// Occurs only if the collectionID is mismatch, should not happen
err = fmt.Errorf("update segment states in Replica wrong, err = %s", err)
log.Error(err.Error())
panic(err)
}
// insert messages -> buffer
for _, msg := range fgMsg.insertMessages {
err := ibNode.bufferInsertMsg(msg, endPositions[0])
if err != nil {
log.Warn("msg to buffer failed", zap.Error(err))
// error occurs when missing schema info or data is misaligned, should not happen
err = fmt.Errorf("insertBufferNode msg to buffer failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
}
@ -341,30 +352,35 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
}
for _, task := range flushTaskList {
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, endPositions[0])
err = retry.Do(ibNode.ctx, func() error {
return ibNode.flushManager.flushBufferData(task.buffer,
task.segmentID,
task.flushed,
task.dropped,
endPositions[0])
}, flowGraphRetryOpt)
if err != nil {
log.Warn("failed to invoke flushBufferData", zap.Error(err))
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
if task.auto {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
}
} else {
segmentsToFlush = append(segmentsToFlush, task.segmentID)
ibNode.insertBuffer.Delete(task.segmentID)
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.SuccessLabel).Inc()
if task.auto {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
}
err = fmt.Errorf("insertBufferNode flushBufferData failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
segmentsToFlush = append(segmentsToFlush, task.segmentID)
ibNode.insertBuffer.Delete(task.segmentID)
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.SuccessLabel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
if task.auto {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.TotalLabel).Inc()
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.FailLabel).Inc()
}
}
if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax, seg2Upload); err != nil {
log.Error("send hard time tick into pulsar channel failed", zap.Error(err))
}
ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax, seg2Upload)
res := flowGraphMsg{
deleteMessages: fgMsg.deleteMessages,
@ -495,11 +511,11 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
}
// writeHardTimeTick writes timetick once insertBufferNode operates.
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp, segmentIDs []int64) error {
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp, segmentIDs []int64) {
ibNode.ttLogger.LogTs(ts)
ibNode.ttMerger.bufferTs(ts, segmentIDs)
return nil
}
func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
return ibNode.replica.getCollectionAndPartitionID(segmentID)
}
@ -558,6 +574,7 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl
})
return &insertBufferNode{
ctx: ctx,
BaseNode: baseNode,
insertBuffer: sync.Map{},

View File

@ -23,6 +23,10 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -199,6 +203,34 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
inMsg = genFlowGraphInsertMsg(insertChannelName)
inMsg.dropCollection = true
assert.NotPanics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
// test consume old message
inMsg = genFlowGraphInsertMsg(insertChannelName)
timestampBak := iBNode.lastTimestamp
iBNode.lastTimestamp = typeutil.MaxTimestamp
assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
iBNode.lastTimestamp = timestampBak
// test updateSegStatesInReplica failed
inMsg = genFlowGraphInsertMsg(insertChannelName)
inMsg.insertMessages[0].CollectionID = UniqueID(-1)
inMsg.insertMessages[0].SegmentID = UniqueID(-1)
assert.NoError(t, err)
assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
// test bufferInsertMsg failed
inMsg = genFlowGraphInsertMsg(insertChannelName)
inMsg.insertMessages[0].Timestamps = []Timestamp{1, 2}
inMsg.insertMessages[0].RowIDs = []int64{1}
assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
// test flushBufferData failed
flowGraphRetryOpt = retry.Attempts(1)
inMsg = genFlowGraphInsertMsg(insertChannelName)
iBNode.flushManager = &mockFlushManager{returnError: true}
iBNode.insertBuffer.Store(inMsg.insertMessages[0].SegmentID, &BufferData{})
assert.Panics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
iBNode.flushManager = fm
}
/*
@ -464,7 +496,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
for _, im := range fgm.segmentsToFlush {
// send del done signal
fm.flushDelData(nil, im, fgm.endPositions[0])
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
}
wg.Wait()
require.Equal(t, 0, len(colRep.newSegments))
@ -573,7 +606,8 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
wg.Add(len(fgm.segmentsToFlush))
for _, im := range fgm.segmentsToFlush {
// send del done signal
fm.flushDelData(nil, im, fgm.endPositions[0])
err = fm.flushDelData(nil, im, fgm.endPositions[0])
assert.NoError(t, err)
}
wg.Wait()
require.Equal(t, 0, len(colRep.newSegments))

View File

@ -16,7 +16,10 @@
package datanode
import "github.com/milvus-io/milvus/internal/util/flowgraph"
import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/retry"
)
type (
// Node is flowgraph.Node
@ -28,3 +31,5 @@ type (
// InputNode is flowgraph.InputNode
InputNode = flowgraph.InputNode
)
var flowGraphRetryOpt = retry.Attempts(5)

View File

@ -19,6 +19,10 @@ package datanode
import (
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
)
type sendTimeTick func(Timestamp, []int64) error
@ -118,7 +122,9 @@ func (mt *mergedTimeTickerSender) work() {
lastTs = mt.ts
mt.lastSent = time.Now()
mt.send(mt.ts, sids)
if err := mt.send(mt.ts, sids); err != nil {
log.Error("send hard time tick failed", zap.Error(err))
}
}
mt.mu.Unlock()
}

View File

@ -42,11 +42,42 @@ type dataSyncService struct {
msFactory msgstream.Factory
}
// checkReplica used to check replica info before init flow graph, it's a private method of dataSyncService
func (dsService *dataSyncService) checkReplica(collectionID UniqueID) error {
// check if the collection exists
hisColl, err := dsService.historicalReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
strColl, err := dsService.streamingReplica.getCollectionByID(collectionID)
if err != nil {
return err
}
if hisColl.getLoadType() != strColl.getLoadType() {
return fmt.Errorf("inconsistent loadType of collection, collectionID = %d", collectionID)
}
for _, channel := range hisColl.getVChannels() {
if _, err := dsService.tSafeReplica.getTSafe(channel); err != nil {
return fmt.Errorf("getTSafe failed, err = %s", err)
}
}
for _, channel := range hisColl.getVDeltaChannels() {
if _, err := dsService.tSafeReplica.getTSafe(channel); err != nil {
return fmt.Errorf("getTSafe failed, err = %s", err)
}
}
return nil
}
// addFlowGraphsForDMLChannels add flowGraphs to dmlChannel2FlowGraph
func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID UniqueID, dmlChannels []string) (map[string]*queryNodeFlowGraph, error) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if err := dsService.checkReplica(collectionID); err != nil {
return nil, err
}
results := make(map[string]*queryNodeFlowGraph)
for _, channel := range dmlChannels {
if _, ok := dsService.dmlChannel2FlowGraph[channel]; ok {
@ -87,6 +118,10 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni
dsService.mu.Lock()
defer dsService.mu.Unlock()
if err := dsService.checkReplica(collectionID); err != nil {
return nil, err
}
results := make(map[string]*queryNodeFlowGraph)
for _, channel := range deltaChannels {
if _, ok := dsService.deltaChannel2FlowGraph[channel]; ok {

View File

@ -21,6 +21,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
func TestDataSyncService_DMLFlowGraphs(t *testing.T) {
@ -40,41 +42,53 @@ func TestDataSyncService_DMLFlowGraphs(t *testing.T) {
dataSyncService := newDataSyncService(ctx, streamingReplica, historicalReplica, tSafe, fac)
assert.NotNil(t, dataSyncService)
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
t.Run("test DMLFlowGraphs", func(t *testing.T) {
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
fg, err := dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel)
assert.NotNil(t, fg)
assert.NoError(t, err)
fg, err := dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel)
assert.NotNil(t, fg)
assert.NoError(t, err)
fg, err = dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, "invalid-vChannel")
assert.Nil(t, fg)
assert.Error(t, err)
err = dataSyncService.startFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel)
assert.NoError(t, err)
err = dataSyncService.startFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel)
assert.NoError(t, err)
dataSyncService.removeFlowGraphsByDMLChannels([]Channel{defaultDMLChannel})
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 0)
err = dataSyncService.startFlowGraphByDMLChannel(defaultCollectionID, "invalid-vChannel")
assert.Error(t, err)
fg, err = dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel)
assert.Nil(t, fg)
assert.Error(t, err)
dataSyncService.removeFlowGraphsByDMLChannels([]Channel{defaultDMLChannel})
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 0)
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
fg, err = dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, defaultDMLChannel)
assert.Nil(t, fg)
assert.Error(t, err)
dataSyncService.close()
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 0)
})
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 1)
t.Run("test DMLFlowGraphs invalid channel", func(t *testing.T) {
fg, err := dataSyncService.getFlowGraphByDMLChannel(defaultCollectionID, "invalid-vChannel")
assert.Nil(t, fg)
assert.Error(t, err)
dataSyncService.close()
assert.Len(t, dataSyncService.dmlChannel2FlowGraph, 0)
err = dataSyncService.startFlowGraphByDMLChannel(defaultCollectionID, "invalid-vChannel")
assert.Error(t, err)
})
t.Run("test addFlowGraphsForDMLChannels checkReplica Failed", func(t *testing.T) {
err = dataSyncService.historicalReplica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
_, err = dataSyncService.addFlowGraphsForDMLChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.Error(t, err)
dataSyncService.historicalReplica.addCollection(defaultCollectionID, genTestCollectionSchema(schemapb.DataType_Int64))
})
}
func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
@ -94,39 +108,126 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
dataSyncService := newDataSyncService(ctx, streamingReplica, historicalReplica, tSafe, fac)
assert.NotNil(t, dataSyncService)
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
t.Run("test DeltaFlowGraphs", func(t *testing.T) {
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
fg, err := dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel)
assert.NotNil(t, fg)
assert.NoError(t, err)
fg, err := dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel)
assert.NotNil(t, fg)
assert.NoError(t, err)
fg, err = dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, "invalid-vChannel")
assert.Nil(t, fg)
assert.Error(t, err)
err = dataSyncService.startFlowGraphForDeltaChannel(defaultCollectionID, defaultDeltaChannel)
assert.NoError(t, err)
err = dataSyncService.startFlowGraphForDeltaChannel(defaultCollectionID, defaultDeltaChannel)
assert.NoError(t, err)
dataSyncService.removeFlowGraphsByDeltaChannels([]Channel{defaultDeltaChannel})
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0)
err = dataSyncService.startFlowGraphForDeltaChannel(defaultCollectionID, "invalid-vChannel")
assert.Error(t, err)
fg, err = dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel)
assert.Nil(t, fg)
assert.Error(t, err)
dataSyncService.removeFlowGraphsByDeltaChannels([]Channel{defaultDeltaChannel})
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0)
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
fg, err = dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel)
assert.Nil(t, fg)
assert.Error(t, err)
dataSyncService.close()
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0)
})
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
t.Run("test DeltaFlowGraphs invalid channel", func(t *testing.T) {
fg, err := dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, "invalid-vChannel")
assert.Nil(t, fg)
assert.Error(t, err)
dataSyncService.close()
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0)
err = dataSyncService.startFlowGraphForDeltaChannel(defaultCollectionID, "invalid-vChannel")
assert.Error(t, err)
})
t.Run("test addFlowGraphsForDeltaChannels checkReplica Failed", func(t *testing.T) {
err = dataSyncService.historicalReplica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel})
assert.Error(t, err)
dataSyncService.historicalReplica.addCollection(defaultCollectionID, genTestCollectionSchema(schemapb.DataType_Int64))
})
}
func TestDataSyncService_checkReplica(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streamingReplica, err := genSimpleReplica()
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
assert.NoError(t, err)
fac := genFactory()
assert.NoError(t, err)
tSafe := newTSafeReplica()
dataSyncService := newDataSyncService(ctx, streamingReplica, historicalReplica, tSafe, fac)
assert.NotNil(t, dataSyncService)
defer dataSyncService.close()
t.Run("test checkReplica", func(t *testing.T) {
err = dataSyncService.checkReplica(defaultCollectionID)
assert.NoError(t, err)
})
t.Run("test collection doesn't exist", func(t *testing.T) {
err = dataSyncService.streamingReplica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
err = dataSyncService.checkReplica(defaultCollectionID)
assert.Error(t, err)
err = dataSyncService.historicalReplica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
err = dataSyncService.checkReplica(defaultCollectionID)
assert.Error(t, err)
coll := dataSyncService.historicalReplica.addCollection(defaultCollectionID, genTestCollectionSchema(schemapb.DataType_Int64))
assert.NotNil(t, coll)
coll = dataSyncService.streamingReplica.addCollection(defaultCollectionID, genTestCollectionSchema(schemapb.DataType_Int64))
assert.NotNil(t, coll)
})
t.Run("test different loadType", func(t *testing.T) {
coll, err := dataSyncService.historicalReplica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
coll.setLoadType(loadTypePartition)
err = dataSyncService.checkReplica(defaultCollectionID)
assert.Error(t, err)
coll, err = dataSyncService.streamingReplica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
coll.setLoadType(loadTypePartition)
})
t.Run("test cannot find tSafe", func(t *testing.T) {
coll, err := dataSyncService.historicalReplica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
coll.addVDeltaChannels([]Channel{defaultDeltaChannel})
coll.addVChannels([]Channel{defaultDMLChannel})
dataSyncService.tSafeReplica.addTSafe(defaultDeltaChannel)
dataSyncService.tSafeReplica.addTSafe(defaultDMLChannel)
dataSyncService.tSafeReplica.removeTSafe(defaultDeltaChannel)
err = dataSyncService.checkReplica(defaultCollectionID)
assert.Error(t, err)
dataSyncService.tSafeReplica.removeTSafe(defaultDMLChannel)
err = dataSyncService.checkReplica(defaultCollectionID)
assert.Error(t, err)
dataSyncService.tSafeReplica.addTSafe(defaultDeltaChannel)
dataSyncService.tSafeReplica.addTSafe(defaultDMLChannel)
})
}

View File

@ -17,6 +17,7 @@
package querynode
import (
"fmt"
"reflect"
"sync"
@ -96,7 +97,13 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
)
if dNode.replica.getSegmentNum() != 0 {
processDeleteMessages(dNode.replica, delMsg, delData)
err := processDeleteMessages(dNode.replica, delMsg, delData)
if err != nil {
// error occurs when missing meta info or unexpected pk type, should not happen
err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s", delMsg.CollectionID, err)
log.Error(err.Error())
panic(err)
}
}
}
@ -104,8 +111,10 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for segmentID, pks := range delData.deleteIDs {
segment, err := dNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Debug("failed to get segment", zap.Int64("segmentId", segmentID), zap.Error(err))
continue
// should not happen, segment should be created before
err = fmt.Errorf("deleteNode getSegmentByID failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
offset := segment.segmentPreDelete(len(pks))
delData.deleteOffset[segmentID] = offset
@ -114,8 +123,17 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// 3. do delete
wg := sync.WaitGroup{}
for segmentID := range delData.deleteOffset {
segmentID := segmentID
wg.Add(1)
go dNode.delete(delData, segmentID, &wg)
go func() {
err := dNode.delete(delData, segmentID, &wg)
if err != nil {
// error occurs when segment cannot be found, calling cgo function delete failed and etc...
err = fmt.Errorf("segment delete failed, segmentID = %d, err = %s", segmentID, err)
log.Error(err.Error())
panic(err)
}
}()
}
wg.Wait()
@ -130,16 +148,15 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// delete will do delete operation at segment which id is segmentID
func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) {
func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) error {
defer wg.Done()
targetSegment, err := dNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Error(err.Error())
return
return fmt.Errorf("getSegmentByID failed, err = %s", err)
}
if targetSegment.segmentType != segmentTypeSealed {
return
return fmt.Errorf("unexpected segmentType when delete, segmentID = %d, segmentType = %s", segmentID, targetSegment.segmentType.String())
}
ids := deleteData.deleteIDs[segmentID]
@ -148,11 +165,11 @@ func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
err = targetSegment.segmentDelete(offset, ids, timestamps)
if err != nil {
log.Warn("delete segment data failed", zap.Int64("segmentID", segmentID), zap.Error(err))
return
return fmt.Errorf("segmentDelete failed, segmentID = %d", segmentID)
}
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID), zap.Any("SegmentType", targetSegment.segmentType))
return nil
}
// newDeleteNode returns a new deleteNode

View File

@ -46,7 +46,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
deleteNode.delete(deleteData, defaultSegmentID, wg)
err = deleteNode.delete(deleteData, defaultSegmentID, wg)
assert.NoError(t, err)
})
t.Run("test segment delete error", func(t *testing.T) {
@ -67,7 +68,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2]
deleteNode.delete(deleteData, defaultSegmentID, wg)
err = deleteNode.delete(deleteData, defaultSegmentID, wg)
assert.Error(t, err)
})
t.Run("test no target segment", func(t *testing.T) {
@ -76,7 +78,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
deleteNode := newDeleteNode(historical)
wg := &sync.WaitGroup{}
wg.Add(1)
deleteNode.delete(nil, defaultSegmentID, wg)
err = deleteNode.delete(nil, defaultSegmentID, wg)
assert.Error(t, err)
})
t.Run("test invalid segmentType", func(t *testing.T) {
@ -93,7 +96,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
deleteNode.delete(&deleteData{}, defaultSegmentID, wg)
err = deleteNode.delete(&deleteData{}, defaultSegmentID, wg)
assert.Error(t, err)
})
}
@ -178,7 +182,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
},
}
msg := []flowgraph.Msg{&dMsg}
deleteNode.Operate(msg)
assert.Panics(t, func() {
deleteNode.Operate(msg)
})
})
t.Run("test partition not exist", func(t *testing.T) {
@ -202,7 +208,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
},
}
msg := []flowgraph.Msg{&dMsg}
deleteNode.Operate(msg)
assert.Panics(t, func() {
deleteNode.Operate(msg)
})
})
t.Run("test invalid input length", func(t *testing.T) {

View File

@ -81,12 +81,18 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_Delete:
resMsg := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg))
resMsg, err := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg))
if err != nil {
// error occurs when missing meta info or data is misaligned, should not happen
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
if resMsg != nil {
dMsg.deleteMessages = append(dMsg.deleteMessages, resMsg)
}
default:
log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type())))
log.Warn("invalid message type in filterDeleteNode", zap.String("message type", msg.Type().String()))
}
}
var res Msg = &dMsg
@ -97,11 +103,9 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// filterInvalidDeleteMessage would filter invalid delete messages
func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) *msgstream.DeleteMsg {
func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) (*msgstream.DeleteMsg, error) {
if err := msg.CheckAligned(); err != nil {
// TODO: what if the messages are misaligned? Here, we ignore those messages and print error
log.Warn("misaligned delete messages detected", zap.Error(err))
return nil
return nil, fmt.Errorf("CheckAligned failed, err = %s", err)
}
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
@ -109,16 +113,29 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
defer sp.Finish()
if msg.CollectionID != fddNode.collectionID {
return nil
return nil, nil
}
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
return nil, nil
}
return msg
// check if collection exists
col, err := fddNode.replica.getCollectionByID(msg.CollectionID)
if err != nil {
// QueryNode should add collection before start flow graph
return nil, fmt.Errorf("filter invalid delete message, collection does not exist, collectionID = %d", msg.CollectionID)
}
if col.getLoadType() == loadTypePartition {
if !fddNode.replica.hasPartition(msg.PartitionID) {
// filter out msg which not belongs to the loaded partitions
return nil, nil
}
}
return msg, nil
}
// newFilteredDeleteNode returns a new filterDeleteNode

View File

@ -17,7 +17,6 @@
package querynode
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
@ -28,7 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func getFilterDeleteNode(ctx context.Context) (*filterDeleteNode, error) {
func getFilterDeleteNode() (*filterDeleteNode, error) {
historical, err := genSimpleReplica()
if err != nil {
return nil, err
@ -39,61 +38,77 @@ func getFilterDeleteNode(ctx context.Context) (*filterDeleteNode, error) {
}
func TestFlowGraphFilterDeleteNode_filterDeleteNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fg, err := getFilterDeleteNode(ctx)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
fg.Name()
}
func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("delete valid test", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
fg, err := getFilterDeleteNode(ctx)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)
assert.NotNil(t, res)
})
t.Run("test delete no collection", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msg.CollectionID = UniqueID(1003)
fg, err := getFilterDeleteNode(ctx)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)
fg.collectionID = UniqueID(1003)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.Error(t, err)
assert.Nil(t, res)
fg.collectionID = defaultCollectionID
})
t.Run("test delete not target collection", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
fg, err := getFilterDeleteNode(ctx)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
fg.collectionID = UniqueID(1000)
res := fg.filterInvalidDeleteMessage(msg)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
t.Run("test delete no data", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
fg, err := getFilterDeleteNode(ctx)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
msg.Timestamps = make([]Timestamp, 0)
msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0)
res := fg.filterInvalidDeleteMessage(msg)
msg.PrimaryKeys = &schemapb.IDs{}
msg.NumRows = 0
res, err := fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{})
res = fg.filterInvalidDeleteMessage(msg)
res, err = fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
t.Run("test not target partition", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
col, err := fg.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.setLoadType(loadTypePartition)
err = fg.replica.removePartition(defaultPartitionID)
assert.NoError(t, err)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
}
func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
genFilterDeleteMsg := func() []flowgraph.Msg {
dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil)
@ -102,7 +117,7 @@ func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) {
t.Run("valid test", func(t *testing.T) {
msg := genFilterDeleteMsg()
fg, err := getFilterDeleteNode(ctx)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
res := fg.Operate(msg)
assert.NotNil(t, res)
@ -110,11 +125,34 @@ func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) {
t.Run("invalid input length", func(t *testing.T) {
msg := genFilterDeleteMsg()
fg, err := getFilterDeleteNode(ctx)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
var m flowgraph.Msg
msg = append(msg, m)
res := fg.Operate(msg)
assert.NotNil(t, res)
})
t.Run("filterInvalidDeleteMessage failed", func(t *testing.T) {
dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
dMsg.NumRows = 0
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
m := []flowgraph.Msg{msg}
assert.Panics(t, func() {
fg.Operate(m)
})
})
t.Run("invalid msgType", func(t *testing.T) {
iMsg, err := genSimpleInsertMsg(genTestCollectionSchema(schemapb.DataType_Int64), defaultDelLength)
assert.NoError(t, err)
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{iMsg}, 0, 1000, nil, nil)
fg, err := getFilterDeleteNode()
assert.NoError(t, err)
res := fg.Operate([]flowgraph.Msg{msg})
assert.NotNil(t, res)
})
}

View File

@ -84,17 +84,29 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Debug("Filter invalid message in QueryNode", zap.String("traceID", traceID))
switch msg.Type() {
case commonpb.MsgType_Insert:
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
resMsg, err := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
if err != nil {
// error occurs when missing meta info or data is misaligned, should not happen
err = fmt.Errorf("filterInvalidInsertMessage failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
case commonpb.MsgType_Delete:
resMsg := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg))
resMsg, err := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg))
if err != nil {
// error occurs when missing meta info or data is misaligned, should not happen
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
if resMsg != nil {
iMsg.deleteMessages = append(iMsg.deleteMessages, resMsg)
}
default:
log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type())))
log.Warn("invalid message type in filterDmNode", zap.String("message type", msg.Type().String()))
}
}
@ -106,11 +118,16 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// filterInvalidDeleteMessage would filter out invalid delete messages
func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) *msgstream.DeleteMsg {
func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) (*msgstream.DeleteMsg, error) {
if err := msg.CheckAligned(); err != nil {
// TODO: what if the messages are misaligned? Here, we ignore those messages and print error
log.Warn("misaligned delete messages detected", zap.Error(err))
return nil
return nil, fmt.Errorf("CheckAligned failed, err = %s", err)
}
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil, nil
}
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
@ -118,41 +135,37 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg
defer sp.Finish()
if msg.CollectionID != fdmNode.collectionID {
return nil
// filter out msg which not belongs to the current collection
return nil, nil
}
// check if collection and partition exist
// check if collection exist
col, err := fdmNode.replica.getCollectionByID(msg.CollectionID)
if err != nil {
log.Debug("filter invalid delete message, collection does not exist",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
// QueryNode should add collection before start flow graph
return nil, fmt.Errorf("filter invalid delete message, collection does not exist, collectionID = %d", msg.CollectionID)
}
if col.getLoadType() == loadTypePartition {
if !fdmNode.replica.hasPartition(msg.PartitionID) {
log.Debug("filter invalid delete message, partition does not exist",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
// filter out msg which not belongs to the loaded partitions
return nil, nil
}
}
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
return msg
return msg, nil
}
// filterInvalidInsertMessage would filter out invalid insert messages
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg {
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) (*msgstream.InsertMsg, error) {
if err := msg.CheckAligned(); err != nil {
// TODO: what if the messages are misaligned? Here, we ignore those messages and print error
log.Warn("Error, misaligned insert messages detected")
return nil
return nil, fmt.Errorf("CheckAligned failed, err = %s", err)
}
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid insert message, no message",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil, nil
}
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
@ -164,23 +177,19 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
//log.Debug("filter invalid insert message, collection is not the target collection",
// zap.Any("collectionID", msg.CollectionID),
// zap.Any("partitionID", msg.PartitionID))
return nil
return nil, nil
}
// check if collection and partition exist
// check if collection exists
col, err := fdmNode.replica.getCollectionByID(msg.CollectionID)
if err != nil {
log.Debug("filter invalid insert message, collection does not exist",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
// QueryNode should add collection before start flow graph
return nil, fmt.Errorf("filter invalid insert message, collection does not exist, collectionID = %d", msg.CollectionID)
}
if col.getLoadType() == loadTypePartition {
if !fdmNode.replica.hasPartition(msg.PartitionID) {
log.Debug("filter invalid insert message, partition does not exist",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
// filter out msg which not belongs to the loaded partitions
return nil, nil
}
}
@ -189,8 +198,8 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
// so we need to compare the endTimestamp of received messages and position's timestamp.
excludedSegments, err := fdmNode.replica.getExcludedSegments(fdmNode.collectionID)
if err != nil {
log.Warn(err.Error())
return nil
// QueryNode should addExcludedSegments for the current collection before start flow graph
return nil, err
}
for _, segmentInfo := range excludedSegments {
// unFlushed segment may not have checkPoint, so `segmentInfo.DmlPosition` may be nil
@ -198,24 +207,17 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
log.Warn("filter unFlushed segment without checkPoint",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
continue
}
if msg.SegmentID == segmentInfo.ID && msg.EndTs() < segmentInfo.DmlPosition.Timestamp {
log.Debug("filter invalid insert message, segments are excluded segments",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
return nil, nil
}
}
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid insert message, no message",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
return msg
return msg, nil
}
// newFilteredDmNode returns a new filterDmNode

View File

@ -17,7 +17,6 @@
package querynode
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
@ -31,7 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func getFilterDMNode(ctx context.Context) (*filterDmNode, error) {
func getFilterDMNode() (*filterDmNode, error) {
streaming, err := genSimpleReplica()
if err != nil {
return nil, err
@ -42,25 +41,21 @@ func getFilterDMNode(ctx context.Context) (*filterDmNode, error) {
}
func TestFlowGraphFilterDmNode_filterDmNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
fg.Name()
}
func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pkType := schemapb.DataType_Int64
schema := genTestCollectionSchema(pkType)
t.Run("valid test", func(t *testing.T) {
msg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
res, err := fg.filterInvalidInsertMessage(msg)
assert.NoError(t, err)
res := fg.filterInvalidInsertMessage(msg)
assert.NotNil(t, res)
})
@ -68,51 +63,57 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
msg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
msg.CollectionID = UniqueID(1000)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
res := fg.filterInvalidInsertMessage(msg)
fg.collectionID = UniqueID(1000)
res, err := fg.filterInvalidInsertMessage(msg)
assert.Error(t, err)
assert.Nil(t, res)
fg.collectionID = defaultCollectionID
})
t.Run("test no partition", func(t *testing.T) {
msg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
msg.PartitionID = UniqueID(1000)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
col, err := fg.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.setLoadType(loadTypePartition)
res := fg.filterInvalidInsertMessage(msg)
res, err := fg.filterInvalidInsertMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
t.Run("test not target collection", func(t *testing.T) {
msg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
fg.collectionID = UniqueID(1000)
res := fg.filterInvalidInsertMessage(msg)
res, err := fg.filterInvalidInsertMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
t.Run("test no exclude segment", func(t *testing.T) {
msg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
fg.replica.removeExcludedSegments(defaultCollectionID)
res := fg.filterInvalidInsertMessage(msg)
res, err := fg.filterInvalidInsertMessage(msg)
assert.Error(t, err)
assert.Nil(t, res)
})
t.Run("test segment is exclude segment", func(t *testing.T) {
msg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
fg.replica.addExcludedSegments(defaultCollectionID, []*datapb.SegmentInfo{
{
@ -124,104 +125,114 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
},
},
})
res := fg.filterInvalidInsertMessage(msg)
res, err := fg.filterInvalidInsertMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
t.Run("test misaligned messages", func(t *testing.T) {
msg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
msg.Timestamps = make([]Timestamp, 0)
res := fg.filterInvalidInsertMessage(msg)
res, err := fg.filterInvalidInsertMessage(msg)
assert.Error(t, err)
assert.Nil(t, res)
})
t.Run("test no data", func(t *testing.T) {
msg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
msg.Timestamps = make([]Timestamp, 0)
msg.RowIDs = make([]IntPrimaryKey, 0)
msg.RowData = make([]*commonpb.Blob, 0)
res := fg.filterInvalidInsertMessage(msg)
msg.NumRows = 0
msg.FieldsData = nil
res, err := fg.filterInvalidInsertMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
}
func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("delete valid test", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)
assert.NotNil(t, res)
})
t.Run("test delete no collection", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msg.CollectionID = UniqueID(1003)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)
fg.collectionID = UniqueID(1003)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.Error(t, err)
assert.Nil(t, res)
fg.collectionID = defaultCollectionID
})
t.Run("test delete no partition", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msg.PartitionID = UniqueID(1000)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
col, err := fg.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.setLoadType(loadTypePartition)
res := fg.filterInvalidDeleteMessage(msg)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
t.Run("test delete not target collection", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
fg.collectionID = UniqueID(1000)
res := fg.filterInvalidDeleteMessage(msg)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
t.Run("test delete misaligned messages", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
msg.Timestamps = make([]Timestamp, 0)
res := fg.filterInvalidDeleteMessage(msg)
res, err := fg.filterInvalidDeleteMessage(msg)
assert.Error(t, err)
assert.Nil(t, res)
})
t.Run("test delete no data", func(t *testing.T) {
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
msg.Timestamps = make([]Timestamp, 0)
msg.NumRows = 0
msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0)
res := fg.filterInvalidDeleteMessage(msg)
msg.PrimaryKeys = &schemapb.IDs{}
res, err := fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{})
res = fg.filterInvalidDeleteMessage(msg)
res, err = fg.filterInvalidDeleteMessage(msg)
assert.NoError(t, err)
assert.Nil(t, res)
})
}
func TestFlowGraphFilterDmNode_Operate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pkType := schemapb.DataType_Int64
schema := genTestCollectionSchema(pkType)
@ -234,7 +245,7 @@ func TestFlowGraphFilterDmNode_Operate(t *testing.T) {
t.Run("valid test", func(t *testing.T) {
msg := genFilterDMMsg()
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
res := fg.Operate(msg)
assert.NotNil(t, res)
@ -242,11 +253,48 @@ func TestFlowGraphFilterDmNode_Operate(t *testing.T) {
t.Run("invalid input length", func(t *testing.T) {
msg := genFilterDMMsg()
fg, err := getFilterDMNode(ctx)
fg, err := getFilterDMNode()
assert.NoError(t, err)
var m flowgraph.Msg
msg = append(msg, m)
res := fg.Operate(msg)
assert.NotNil(t, res)
})
t.Run("filterInvalidInsertMessage failed", func(t *testing.T) {
iMsg, err := genSimpleInsertMsg(schema, defaultDelLength)
assert.NoError(t, err)
iMsg.NumRows = 0
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{iMsg}, 0, 1000, nil, nil)
fg, err := getFilterDMNode()
assert.NoError(t, err)
m := []flowgraph.Msg{msg}
assert.Panics(t, func() {
fg.Operate(m)
})
})
t.Run("filterInvalidDeleteMessage failed", func(t *testing.T) {
dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
dMsg.NumRows = 0
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil)
fg, err := getFilterDMNode()
assert.NoError(t, err)
m := []flowgraph.Msg{msg}
assert.Panics(t, func() {
fg.Operate(m)
})
})
t.Run("invalid msgType", func(t *testing.T) {
iMsg, err := genSimpleInsertMsg(genTestCollectionSchema(schemapb.DataType_Int64), defaultDelLength)
assert.NoError(t, err)
iMsg.Base.MsgType = commonpb.MsgType_Search
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{iMsg}, 0, 1000, nil, nil)
fg, err := getFilterDMNode()
assert.NoError(t, err)
res := fg.Operate([]flowgraph.Msg{msg})
assert.NotNil(t, res)
})
}

View File

@ -113,14 +113,18 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// if loadType is loadCollection, check if partition exists, if not, create partition
col, err := iNode.streamingReplica.getCollectionByID(insertMsg.CollectionID)
if err != nil {
log.Warn("failed to get collection", zap.Error(err))
continue
// should not happen, QueryNode should create collection before start flow graph
err = fmt.Errorf("insertNode getCollectionByID failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
if col.getLoadType() == loadTypeCollection {
err = iNode.streamingReplica.addPartition(insertMsg.CollectionID, insertMsg.PartitionID)
if err != nil {
log.Warn("failed to add partition", zap.Error(err))
continue
// error occurs only when collection cannot be found, should not happen
err = fmt.Errorf("insertNode addPartition failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
}
@ -128,15 +132,19 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if !iNode.streamingReplica.hasSegment(insertMsg.SegmentID) {
err := iNode.streamingReplica.addSegment(insertMsg.SegmentID, insertMsg.PartitionID, insertMsg.CollectionID, insertMsg.ShardName, segmentTypeGrowing)
if err != nil {
log.Warn("failed to add segment", zap.Error(err))
continue
// error occurs when collection or partition cannot be found, collection and partition should be created before
err = fmt.Errorf("insertNode addSegment failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
}
insertRecord, err := storage.TransferInsertMsgToInsertRecord(col.schema, insertMsg)
if err != nil {
log.Warn("failed to transfer msgStream.insertMsg to segcorepb.InsertRecord", zap.Error(err))
return []Msg{}
// occurs only when schema doesn't have dim param, this should not happen
err = fmt.Errorf("failed to transfer msgStream.insertMsg to storage.InsertRecord, err = %s", err)
log.Error(err.Error())
panic(err)
}
iData.insertIDs[insertMsg.SegmentID] = append(iData.insertIDs[insertMsg.SegmentID], insertMsg.RowIDs...)
@ -148,8 +156,10 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
pks, err := getPrimaryKeys(insertMsg, iNode.streamingReplica)
if err != nil {
log.Warn("failed to get primary keys", zap.Error(err))
continue
// error occurs when cannot find collection or data is misaligned, should not happen
err = fmt.Errorf("failed to get primary keys, err = %d", err)
log.Error(err.Error())
panic(err)
}
iData.insertPKs[insertMsg.SegmentID] = append(iData.insertPKs[insertMsg.SegmentID], pks...)
}
@ -158,16 +168,20 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for segmentID := range iData.insertRecords {
var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Warn(err.Error())
continue
// should not happen, segment should be created before
err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
var numOfRecords = len(iData.insertIDs[segmentID])
if targetSegment != nil {
offset, err := targetSegment.segmentPreInsert(numOfRecords)
if err != nil {
log.Warn(err.Error())
continue
// error occurs when cgo function `PreInsert` failed
err = fmt.Errorf("segmentPreInsert failed, segmentID = %d, err = %s", segmentID, err)
log.Error(err.Error())
panic(err)
}
iData.insertOffset[segmentID] = offset
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID))
@ -178,8 +192,17 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// 3. do insert
wg := sync.WaitGroup{}
for segmentID := range iData.insertRecords {
segmentID := segmentID
wg.Add(1)
go iNode.insert(&iData, segmentID, &wg)
go func() {
err := iNode.insert(&iData, segmentID, &wg)
if err != nil {
// error occurs when segment cannot be found or cgo function `Insert` failed
err = fmt.Errorf("segment insert failed, segmentID = %d, err = %s", segmentID, err)
log.Error(err.Error())
panic(err)
}
}()
}
wg.Wait()
@ -196,7 +219,13 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
zap.Any("collectionName", delMsg.CollectionName),
zap.Int64("numPKs", delMsg.NumRows),
zap.Any("timestamp", delMsg.Timestamps))
processDeleteMessages(iNode.streamingReplica, delMsg, delData)
err := processDeleteMessages(iNode.streamingReplica, delMsg, delData)
if err != nil {
// error occurs when missing meta info or unexpected pk type, should not happen
err = fmt.Errorf("insertNode processDeleteMessages failed, collectionID = %d, err = %s", delMsg.CollectionID, err)
log.Error(err.Error())
panic(err)
}
}
}
@ -204,8 +233,10 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for segmentID, pks := range delData.deleteIDs {
segment, err := iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Debug(err.Error())
continue
// error occurs when segment cannot be found, should not happen
err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
offset := segment.segmentPreDelete(len(pks))
delData.deleteOffset[segmentID] = offset
@ -213,8 +244,17 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// 3. do delete
for segmentID := range delData.deleteOffset {
segmentID := segmentID
wg.Add(1)
go iNode.delete(delData, segmentID, &wg)
go func() {
err := iNode.delete(delData, segmentID, &wg)
if err != nil {
// error occurs when segment cannot be found, calling cgo function delete failed and etc...
err = fmt.Errorf("segment delete failed, segmentID = %d, err = %s", segmentID, err)
log.Error(err.Error())
panic(err)
}
}()
}
wg.Wait()
@ -229,7 +269,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// processDeleteMessages would execute delete operations for growing segments
func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, delData *deleteData) {
func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, delData *deleteData) error {
var partitionIDs []UniqueID
var err error
if msg.PartitionID != -1 {
@ -237,16 +277,14 @@ func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, d
} else {
partitionIDs, err = replica.getPartitionIDs(msg.CollectionID)
if err != nil {
log.Warn(err.Error())
return
return err
}
}
resultSegmentIDs := make([]UniqueID, 0)
for _, partitionID := range partitionIDs {
segmentIDs, err := replica.getSegmentIDs(partitionID)
if err != nil {
log.Warn(err.Error())
continue
return err
}
resultSegmentIDs = append(resultSegmentIDs, segmentIDs...)
}
@ -255,26 +293,22 @@ func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, d
for _, segmentID := range resultSegmentIDs {
segment, err := replica.getSegmentByID(segmentID)
if err != nil {
log.Warn(err.Error())
continue
return err
}
pks, tss, err := filterSegmentsByPKs(primaryKeys, msg.Timestamps, segment)
if err != nil {
log.Warn(err.Error())
continue
return err
}
if len(pks) > 0 {
delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], pks...)
delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], tss...)
}
}
return nil
}
// filterSegmentsByPKs would filter segments by primary keys
func filterSegmentsByPKs(pks []primaryKey, timestamps []Timestamp, segment *Segment) ([]primaryKey, []Timestamp, error) {
if pks == nil {
return nil, nil, fmt.Errorf("pks is nil when getSegmentsByPKs")
}
if segment == nil {
return nil, nil, fmt.Errorf("segments is nil when getSegmentsByPKs")
}
@ -304,18 +338,12 @@ func filterSegmentsByPKs(pks []primaryKey, timestamps []Timestamp, segment *Segm
}
// insert would execute insert operations for specific growing segment
func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) {
func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) error {
defer wg.Done()
var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID))
// TODO: add error handling
wg.Done()
return
}
if targetSegment.segmentType != segmentTypeGrowing {
wg.Done()
return
return fmt.Errorf("getSegmentByID failed, err = %s", err)
}
ids := iData.insertIDs[segmentID]
@ -328,27 +356,23 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.
err = targetSegment.segmentInsert(offsets, ids, timestamps, insertRecord)
if err != nil {
log.Debug("QueryNode: targetSegmentInsert failed", zap.Error(err))
// TODO: add error handling
wg.Done()
return
return fmt.Errorf("segmentInsert failed, segmentID = %d, err = %s", segmentID, err)
}
log.Debug("Do insert done", zap.Int("len", len(iData.insertIDs[segmentID])), zap.Int64("collectionID", targetSegment.collectionID), zap.Int64("segmentID", segmentID))
wg.Done()
return nil
}
// delete would execute delete operations for specific growing segment
func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) {
func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) error {
defer wg.Done()
targetSegment, err := iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Warn(err.Error())
return
return fmt.Errorf("getSegmentByID failed, err = %s", err)
}
if targetSegment.segmentType != segmentTypeGrowing {
return
return fmt.Errorf("unexpected segmentType when delete, segmentType = %s", targetSegment.segmentType.String())
}
ids := deleteData.deleteIDs[segmentID]
@ -357,11 +381,11 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
err = targetSegment.segmentDelete(offset, ids, timestamps)
if err != nil {
log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err))
return
return fmt.Errorf("segmentDelete failed, err = %s", err)
}
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID))
return nil
}
// TODO: remove this function to proper file
@ -415,8 +439,7 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll
if t.Key == "dim" {
dim, err := strconv.Atoi(t.Value)
if err != nil {
log.Warn("strconv wrong on get dim", zap.Error(err))
break
return nil, fmt.Errorf("strconv wrong on get dim, err = %s", err)
}
offset += dim * 4
break
@ -427,8 +450,7 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll
if t.Key == "dim" {
dim, err := strconv.Atoi(t.Value)
if err != nil {
log.Warn("strconv wrong on get dim", zap.Error(err))
return nil, err
return nil, fmt.Errorf("strconv wrong on get dim, err = %s", err)
}
offset += dim / 8
break

View File

@ -21,6 +21,8 @@ import (
"sync"
"testing"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/bits-and-blooms/bloom/v3"
"github.com/stretchr/testify/assert"
@ -31,6 +33,24 @@ import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func getInsertNode() (*insertNode, error) {
streaming, err := genSimpleReplica()
if err != nil {
return nil, err
}
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
if err != nil {
return nil, err
}
return newInsertNode(streaming), nil
}
func genFlowGraphInsertData(schema *schemapb.CollectionSchema, numRows int) (*insertData, error) {
insertMsg, err := genSimpleInsertMsg(schema, numRows)
if err != nil {
@ -76,49 +96,34 @@ func genFlowGraphDeleteData() (*deleteData, error) {
}
func TestFlowGraphInsertNode_insert(t *testing.T) {
pkType := schemapb.DataType_Int64
schema := genTestCollectionSchema(pkType)
t.Run("test insert", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
insertNode, err := getInsertNode()
assert.NoError(t, err)
collection, err := streaming.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
insertData, err := genFlowGraphInsertData(collection.schema, defaultMsgLength)
insertData, err := genFlowGraphInsertData(schema, defaultMsgLength)
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.insert(insertData, defaultSegmentID, wg)
err = insertNode.insert(insertData, defaultSegmentID, wg)
assert.NoError(t, err)
})
t.Run("test segment insert error", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
insertNode, err := getInsertNode()
assert.NoError(t, err)
collection, err := streaming.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
insertData, err := genFlowGraphInsertData(collection.schema, defaultMsgLength)
insertData, err := genFlowGraphInsertData(schema, defaultMsgLength)
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
insertData.insertRecords[defaultSegmentID] = insertData.insertRecords[defaultSegmentID][:len(insertData.insertRecords[defaultSegmentID])/2]
insertNode.insert(insertData, defaultSegmentID, wg)
err = insertNode.insert(insertData, defaultSegmentID, wg)
assert.Error(t, err)
})
t.Run("test no target segment", func(t *testing.T) {
@ -127,84 +132,65 @@ func TestFlowGraphInsertNode_insert(t *testing.T) {
insertNode := newInsertNode(streaming)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.insert(nil, defaultSegmentID, wg)
err = insertNode.insert(nil, defaultSegmentID, wg)
assert.Error(t, err)
})
t.Run("test invalid segmentType", func(t *testing.T) {
streaming, err := genSimpleReplica()
insertNode, err := getInsertNode()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeSealed)
insertData, err := genFlowGraphInsertData(schema, defaultMsgLength)
assert.NoError(t, err)
seg, err := insertNode.streamingReplica.getSegmentByID(defaultSegmentID)
assert.NoError(t, err)
seg.setType(segmentTypeSealed)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.insert(nil, defaultSegmentID, wg)
err = insertNode.insert(insertData, defaultSegmentID, wg)
assert.Error(t, err)
})
}
func TestFlowGraphInsertNode_delete(t *testing.T) {
pkType := schemapb.DataType_Int64
schema := genTestCollectionSchema(pkType)
t.Run("test insert and delete", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
insertNode, err := getInsertNode()
assert.NoError(t, err)
collection, err := streaming.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
insertData, err := genFlowGraphInsertData(collection.schema, defaultMsgLength)
insertData, err := genFlowGraphInsertData(schema, defaultMsgLength)
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.insert(insertData, defaultSegmentID, wg)
err = insertNode.insert(insertData, defaultSegmentID, wg)
assert.NoError(t, err)
deleteData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
wg.Add(1)
insertNode.delete(deleteData, defaultSegmentID, wg)
err = insertNode.delete(deleteData, defaultSegmentID, wg)
assert.NoError(t, err)
})
t.Run("test only delete", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
insertNode, err := getInsertNode()
assert.NoError(t, err)
deleteData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.delete(deleteData, defaultSegmentID, wg)
err = insertNode.delete(deleteData, defaultSegmentID, wg)
assert.NoError(t, err)
})
t.Run("test segment delete error", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
insertNode, err := getInsertNode()
assert.NoError(t, err)
deleteData, err := genFlowGraphDeleteData()
@ -212,7 +198,8 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2]
insertNode.delete(deleteData, defaultSegmentID, wg)
err = insertNode.delete(deleteData, defaultSegmentID, wg)
assert.Error(t, err)
})
t.Run("test no target segment", func(t *testing.T) {
@ -221,27 +208,49 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
insertNode := newInsertNode(streaming)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.delete(nil, defaultSegmentID, wg)
err = insertNode.delete(nil, defaultSegmentID, wg)
assert.Error(t, err)
})
}
func TestFlowGraphInsertNode_processDeleteMessages(t *testing.T) {
t.Run("test processDeleteMessages", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
dData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
err = processDeleteMessages(streaming, dMsg, dData)
assert.NoError(t, err)
})
t.Run("test processDeleteMessages", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
dMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
dData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
err = processDeleteMessages(streaming, dMsg, dData)
assert.NoError(t, err)
})
}
func TestFlowGraphInsertNode_operate(t *testing.T) {
t.Run("test operate", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
pkType := schemapb.DataType_Int64
schema := genTestCollectionSchema(pkType)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
genMsgStreamInsertMsg := func() *msgstream.InsertMsg {
iMsg, err := genSimpleInsertMsg(schema, defaultMsgLength)
assert.NoError(t, err)
return iMsg
}
collection, err := streaming.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
msgInsertMsg, err := genSimpleInsertMsg(collection.schema, defaultMsgLength)
assert.NoError(t, err)
genInsertMsg := func() *insertMsg {
msgInsertMsg := genMsgStreamInsertMsg()
msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
iMsg := insertMsg{
insertMessages: []*msgstream.InsertMsg{
@ -251,28 +260,26 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&iMsg}
return &iMsg
}
t.Run("test operate", func(t *testing.T) {
insertNode, err := getInsertNode()
assert.NoError(t, err)
msg := []flowgraph.Msg{genInsertMsg()}
insertNode.Operate(msg)
s, err := streaming.getSegmentByID(defaultSegmentID)
s, err := insertNode.streamingReplica.getSegmentByID(defaultSegmentID)
assert.Nil(t, err)
buf := make([]byte, 8)
for i := 0; i < defaultMsgLength; i++ {
common.Endian.PutUint64(buf, uint64(i))
assert.True(t, s.pkFilter.Test(buf))
}
})
t.Run("test invalid partitionID", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
insertNode, err := getInsertNode()
assert.NoError(t, err)
msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
@ -288,15 +295,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
})
t.Run("test collection partition not exist", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
insertNode, err := getInsertNode()
assert.NoError(t, err)
msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
@ -309,19 +308,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
},
}
msg := []flowgraph.Msg{&iMsg}
insertNode.Operate(msg)
assert.Panics(t, func() {
insertNode.Operate(msg)
})
})
t.Run("test partition not exist", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
insertNode, err := getInsertNode()
assert.NoError(t, err)
msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
@ -333,36 +326,63 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
},
}
msg := []flowgraph.Msg{&iMsg}
insertNode.Operate(msg)
assert.Panics(t, func() {
insertNode.Operate(msg)
})
})
t.Run("test invalid input length", func(t *testing.T) {
insertNode, err := getInsertNode()
assert.NoError(t, err)
msg := []flowgraph.Msg{genInsertMsg(), genInsertMsg()}
insertNode.Operate(msg)
})
t.Run("test getCollectionByID failed", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming)
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultDMLChannel,
segmentTypeGrowing)
msg := []flowgraph.Msg{genInsertMsg()}
err = insertNode.streamingReplica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
assert.Panics(t, func() {
insertNode.Operate(msg)
})
})
t.Run("test TransferInsertMsgToInsertRecord failed", func(t *testing.T) {
insertNode, err := getInsertNode()
assert.NoError(t, err)
collection, err := streaming.getCollectionByID(defaultCollectionID)
col, err := insertNode.streamingReplica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
msgInsertMsg, err := genSimpleInsertMsg(collection.schema, defaultMsgLength)
assert.NoError(t, err)
msgDeleteMsg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
iMsg := insertMsg{
insertMessages: []*msgstream.InsertMsg{
msgInsertMsg,
},
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
for i, field := range col.schema.GetFields() {
if field.DataType == schemapb.DataType_FloatVector {
col.schema.Fields[i].TypeParams = nil
}
}
msg := []flowgraph.Msg{&iMsg, &iMsg}
insertNode.Operate(msg)
iMsg := genInsertMsg()
iMsg.insertMessages[0].Version = internalpb.InsertDataVersion_RowBased
msg := []flowgraph.Msg{iMsg}
assert.Panics(t, func() {
insertNode.Operate(msg)
})
})
t.Run("test getPrimaryKeys failed", func(t *testing.T) {
insertNode, err := getInsertNode()
assert.NoError(t, err)
iMsg := genInsertMsg()
iMsg.insertMessages[0].NumRows = 0
msg := []flowgraph.Msg{iMsg}
assert.Panics(t, func() {
insertNode.Operate(msg)
})
})
}
@ -394,7 +414,7 @@ func TestFilterSegmentsByPKs(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(pks), 0)
_, _, err = filterSegmentsByPKs(nil, timestamps, segment)
assert.NotNil(t, err)
assert.NoError(t, err)
_, _, err = filterSegmentsByPKs([]primaryKey{pk0, pk1, pk2, pk3, pk4}, timestamps, nil)
assert.NotNil(t, err)
})
@ -424,7 +444,7 @@ func TestFilterSegmentsByPKs(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(pks), 0)
_, _, err = filterSegmentsByPKs(nil, timestamps, segment)
assert.NotNil(t, err)
assert.NoError(t, err)
_, _, err = filterSegmentsByPKs([]primaryKey{pk0, pk1, pk2, pk3, pk4}, timestamps, nil)
assert.NotNil(t, err)
})

View File

@ -20,10 +20,11 @@ import (
"fmt"
"reflect"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
)
// serviceTimeNode is one of the nodes in delta flow graph
@ -63,10 +64,8 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// update service time
err := stNode.tSafeReplica.setTSafe(stNode.vChannel, serviceTimeMsg.timeRange.timestampMax)
if err != nil {
log.Error("serviceTimeNode setTSafe failed",
zap.Any("collectionID", stNode.collectionID),
zap.Error(err),
)
// should not happen, QueryNode should addTSafe before start flow graph
panic(fmt.Errorf("serviceTimeNode setTSafe timeout, collectionID = %d, err = %s", stNode.collectionID, err))
}
p, _ := tsoutil.ParseTS(serviceTimeMsg.timeRange.timestampMax)
log.RatedDebug(10.0, "update tSafe:",

View File

@ -19,6 +19,8 @@ package querynode
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
@ -78,7 +80,9 @@ func TestServiceTimeNode_Operate(t *testing.T) {
timestampMax: 1000,
},
}
in := []flowgraph.Msg{msg, msg}
node.Operate(in)
in := []flowgraph.Msg{msg}
assert.Panics(t, func() {
node.Operate(in)
})
})
}

View File

@ -262,7 +262,9 @@ func (replica *metaReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID,
return nil, err
}
return collection.partitionIDs, nil
parID := make([]UniqueID, len(collection.partitionIDs))
copy(parID, collection.partitionIDs)
return parID, nil
}
func (replica *metaReplica) getIndexedFieldIDByCollectionIDPrivate(collectionID UniqueID, segment *Segment) ([]FieldID, error) {
@ -500,7 +502,10 @@ func (replica *metaReplica) getSegmentIDsPrivate(partitionID UniqueID) ([]Unique
if err2 != nil {
return nil, err2
}
return partition.segmentIDs, nil
segIDs := make([]UniqueID, len(partition.segmentIDs))
copy(segIDs, partition.segmentIDs)
return segIDs, nil
}
//----------------------------------------------------------------------------------------------------- segment

View File

@ -973,7 +973,7 @@ func genSimpleInsertMsg(schema *schemapb.CollectionSchema, numRows int) (*msgstr
return &msgstream.InsertMsg{
BaseMsg: genMsgStreamBaseMsg(),
InsertRequest: internalpb.InsertRequest{
Base: genCommonMsgBase(commonpb.MsgType_Retrieve),
Base: genCommonMsgBase(commonpb.MsgType_Insert),
CollectionName: defaultCollectionName,
PartitionName: defaultPartitionName,
CollectionID: defaultCollectionID,

View File

@ -596,7 +596,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [
s.segPtrMu.RLock()
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
if s.segmentType != segmentTypeGrowing {
return nil
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String())
}
if s.segmentPtr == nil {

View File

@ -343,7 +343,7 @@ func TestSegment_segmentInsert(t *testing.T) {
segment, err := genSimpleSealedSegment(defaultMsgLength)
assert.NoError(t, err)
err = segment.segmentInsert(0, nil, nil, nil)
assert.NoError(t, err)
assert.Error(t, err)
})
}

View File

@ -827,14 +827,17 @@ func TestTask_releasePartitionTask(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
col, err := node.historical.getCollectionByID(defaultCollectionID)
hisCol, err := node.historical.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
strCol, err := node.streaming.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
err = node.historical.removePartition(defaultPartitionID)
assert.NoError(t, err)
col.addVDeltaChannels([]Channel{defaultDeltaChannel})
col.setLoadType(loadTypePartition)
hisCol.addVDeltaChannels([]Channel{defaultDeltaChannel})
hisCol.setLoadType(loadTypePartition)
strCol.setLoadType(loadTypePartition)
/*
err = node.queryService.addQueryCollection(defaultCollectionID)