Improve name of flowgraph node (#14538)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2021-12-30 10:33:46 +08:00 committed by GitHub
parent d5e6adf381
commit ebb9b24b47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 17 additions and 9 deletions

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"fmt"
"sync"
"sync/atomic"
@ -68,7 +69,7 @@ type ddNode struct {
// Name returns node name, implementing flowgraph.Node
func (ddn *ddNode) Name() string {
return "ddNode"
return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vchannelName)
}
// Operate handles input messages, implementing flowgrpah.Node

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
@ -90,7 +91,7 @@ func TestFlowGraph_DDNode_newDDNode(te *testing.T) {
for _, seg := range ddNode.flushedSegments {
flushedSegIDs = append(flushedSegIDs, seg.ID)
}
assert.Equal(t, "ddNode", ddNode.Name())
assert.Equal(t, fmt.Sprintf("ddNode-%d-%s", ddNode.collectionID, ddNode.vchannelName), ddNode.Name())
assert.Equal(t, test.inCollID, ddNode.collectionID)
assert.Equal(t, len(test.inFlushedSegs), len(ddNode.flushedSegments))
assert.ElementsMatch(t, test.inFlushedSegs, flushedSegIDs)

View File

@ -91,7 +91,7 @@ func newDelDataBuf() *DelDataBuf {
}
func (dn *deleteNode) Name() string {
return "deleteNode"
return "deleteNode-" + dn.channelName
}
func (dn *deleteNode) Close() {

View File

@ -121,7 +121,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, dn)
assert.Equal(t, "deleteNode", dn.Name())
assert.Equal(t, "deleteNode-"+dn.channelName, dn.Name())
dn.Close()
})
}

View File

@ -57,6 +57,7 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
log.Debug("datanode Seek successfully", zap.String("Channel Name", seekPos.GetChannelName()), zap.Duration("elapse", time.Since(start)))
}
node := flowgraph.NewInputNode(insertStream, "dmInputNode", dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism)
name := fmt.Sprintf("dmInputNode-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
node := flowgraph.NewInputNode(insertStream, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism)
return node, nil
}

View File

@ -142,7 +142,7 @@ func (bd *BufferData) updateSize(no int64) {
}
func (ibNode *insertBufferNode) Name() string {
return "ibNode"
return "ibNode-" + ibNode.channelName
}
func (ibNode *insertBufferNode) Close() {

View File

@ -17,6 +17,8 @@
package querynode
import (
"fmt"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
@ -36,7 +38,7 @@ type filterDeleteNode struct {
// Name returns the name of filterDeleteNode
func (fddNode *filterDeleteNode) Name() string {
return "fdNode"
return fmt.Sprintf("fdNode-%d", fddNode.collectionID)
}
// Operate handles input messages, to filter invalid delete messages

View File

@ -17,6 +17,8 @@
package querynode
import (
"fmt"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
@ -36,7 +38,7 @@ type filterDmNode struct {
// Name returns the name of filterDmNode
func (fdmNode *filterDmNode) Name() string {
return "fdmNode"
return fmt.Sprintf("fdmNode-%d", fdmNode.collectionID)
}
// Operate handles input messages, to filter invalid insert messages

View File

@ -18,6 +18,7 @@ package querynode
import (
"context"
"fmt"
"go.uber.org/zap"
@ -37,7 +38,7 @@ type serviceTimeNode struct {
// Name returns the name of serviceTimeNode
func (stNode *serviceTimeNode) Name() string {
return "stNode"
return fmt.Sprintf("stNode-%d-%s", stNode.collectionID, stNode.vChannel)
}
// Close would close serviceTimeNode