Add Collection ID in ddNode and filter logic (#5496)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-05-31 15:28:30 +08:00 committed by zhenshan.cao
parent 1f3c9d5dc7
commit 68bf983f21
3 changed files with 33 additions and 14 deletions

View File

@ -87,7 +87,7 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelPair) {
var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetDdlVchannelName(), vchanPair.GetDdlPosition()) var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetDdlVchannelName(), vchanPair.GetDdlPosition())
var filterDmNode Node = newFilteredDmNode() var filterDmNode Node = newFilteredDmNode()
var ddNode Node = newDDNode(dsService.ctx, dsService.flushChan, dsService.replica, dsService.idAllocator) var ddNode Node = newDDNode(dsService.ctx, dsService.flushChan, dsService.replica, dsService.idAllocator, vchanPair.CollectionID)
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.replica, dsService.msFactory, dsService.idAllocator) var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.replica, dsService.msFactory, dsService.idAllocator)
var gcNode Node = newGCNode(dsService.replica) var gcNode Node = newGCNode(dsService.replica)

View File

@ -26,7 +26,6 @@ import (
miniokv "github.com/milvus-io/milvus/internal/kv/minio" miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
@ -46,6 +45,8 @@ type ddNode struct {
kv kv.BaseKV kv kv.BaseKV
replica Replica replica Replica
collectionID UniqueID
} }
type ddData struct { type ddData struct {
@ -85,6 +86,10 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// TODO: add error handling // TODO: add error handling
} }
if len(in) == 0 {
return []flowgraph.Msg{}
}
msMsg, ok := in[0].(*MsgStreamMsg) msMsg, ok := in[0].(*MsgStreamMsg)
if !ok { if !ok {
log.Error("type assertion failed for MsgStreamMsg") log.Error("type assertion failed for MsgStreamMsg")
@ -122,15 +127,27 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// do dd tasks // do dd tasks
for _, msg := range tsMessages { for _, msg := range tsMessages {
switch msg.Type() { switch msg := msg.(type) {
case commonpb.MsgType_CreateCollection: case *msgstream.CreateCollectionMsg:
ddNode.createCollection(msg.(*msgstream.CreateCollectionMsg)) if msg.CollectionID != ddNode.collectionID {
case commonpb.MsgType_DropCollection: continue
ddNode.dropCollection(msg.(*msgstream.DropCollectionMsg)) }
case commonpb.MsgType_CreatePartition: ddNode.createCollection(msg)
ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg)) case *msgstream.DropCollectionMsg:
case commonpb.MsgType_DropPartition: if msg.CollectionID != ddNode.collectionID {
ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) continue
}
ddNode.dropCollection(msg)
case *msgstream.CreatePartitionMsg:
if msg.CollectionID != ddNode.collectionID {
continue
}
ddNode.createPartition(msg)
case *msgstream.DropPartitionMsg:
if msg.CollectionID != ddNode.collectionID {
continue
}
ddNode.dropPartition(msg)
default: default:
log.Error("Not supporting message type", zap.Any("Type", msg.Type())) log.Error("Not supporting message type", zap.Any("Type", msg.Type()))
} }
@ -439,7 +456,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
} }
func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg, func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg,
replica Replica, idAllocator allocatorInterface) *ddNode { replica Replica, idAllocator allocatorInterface, collectionID UniqueID) *ddNode {
maxQueueLength := Params.FlowGraphMaxQueueLength maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism maxParallelism := Params.FlowGraphMaxParallelism
@ -478,5 +495,7 @@ func newDDNode(ctx context.Context, inFlushCh <-chan *flushMsg,
kv: minioKV, kv: minioKV,
replica: replica, replica: replica,
flushMap: &sync.Map{}, flushMap: &sync.Map{},
collectionID: collectionID,
} }
} }

View File

@ -45,9 +45,9 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
defer close(inFlushCh) defer close(inFlushCh)
replica := newReplica() replica := newReplica()
ddNode := newDDNode(ctx, inFlushCh, replica, NewAllocatorFactory())
collID := UniqueID(0) collID := UniqueID(0)
ddNode := newDDNode(ctx, inFlushCh, replica, NewAllocatorFactory(), collID)
collName := "col-test-0" collName := "col-test-0"
// create collection // create collection
createCollReq := internalpb.CreateCollectionRequest{ createCollReq := internalpb.CreateCollectionRequest{