Print seek postion time and seek elapse (#20271)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2022-11-03 15:03:35 +08:00 committed by GitHub
parent c08f11420f
commit d528aa3d90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 130 additions and 123 deletions

View File

@ -21,13 +21,15 @@ import (
"fmt"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all
@ -48,13 +50,27 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
if seekPos != nil {
insertStream.AsConsumer([]string{pchannelName}, consumeSubName, mqwrapper.SubscriptionPositionUnknown)
seekPos.ChannelName = pchannelName
cpTs, _ := tsoutil.ParseTS(seekPos.Timestamp)
start := time.Now()
log.Info("datanode begin to seek", zap.ByteString("seek msgID", seekPos.GetMsgID()), zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID))
log.Info("datanode begin to seek",
zap.ByteString("seek msgID", seekPos.GetMsgID()),
zap.String("pchannel", seekPos.GetChannelName()),
zap.String("vchannel", dmNodeConfig.vChannelName),
zap.Time("position", cpTs),
zap.Duration("tsLag", time.Since(cpTs)),
zap.Int64("collection ID", dmNodeConfig.collectionID))
err = insertStream.Seek([]*internalpb.MsgPosition{seekPos})
if err != nil {
return nil, err
}
log.Info("datanode seek successfully", zap.ByteString("seek msgID", seekPos.GetMsgID()), zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID), zap.Duration("elapse", time.Since(start)))
log.Info("datanode seek successfully",
zap.ByteString("seek msgID", seekPos.GetMsgID()),
zap.String("pchannel", seekPos.GetChannelName()),
zap.String("vchannel", dmNodeConfig.vChannelName),
zap.Time("position", cpTs),
zap.Duration("tsLag", time.Since(cpTs)),
zap.Int64("collection ID", dmNodeConfig.collectionID),
zap.Duration("elapse", time.Since(start)))
} else {
insertStream.AsConsumer([]string{pchannelName}, consumeSubName, mqwrapper.SubscriptionPositionEarliest)
}

View File

@ -43,12 +43,12 @@ type deleteNode struct {
baseNode
collectionID UniqueID
metaReplica ReplicaInterface // historical
channel Channel
vchannel Channel
}
// Name returns the name of deleteNode
func (dNode *deleteNode) Name() string {
return fmt.Sprintf("dNode-%s", dNode.channel)
return fmt.Sprintf("dNode-%s", dNode.vchannel)
}
// Operate handles input messages, do delete operations
@ -86,22 +86,22 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for i, delMsg := range dMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Debug("delete in historical replica",
zap.String("channel", dNode.channel),
zap.Any("collectionID", delMsg.CollectionID),
zap.Any("collectionName", delMsg.CollectionName),
zap.String("vchannel", dNode.vchannel),
zap.Int64("collectionID", delMsg.CollectionID),
zap.String("collectionName", delMsg.CollectionName),
zap.Int64("numPKs", delMsg.NumRows),
zap.Int("numTS", len(delMsg.Timestamps)),
zap.Any("timestampBegin", delMsg.BeginTs()),
zap.Any("timestampEnd", delMsg.EndTs()),
zap.Any("segmentNum", dNode.metaReplica.getSegmentNum(segmentTypeSealed)),
zap.Any("traceID", traceID),
zap.Uint64("timestampBegin", delMsg.BeginTs()),
zap.Uint64("timestampEnd", delMsg.EndTs()),
zap.Int("segmentNum", dNode.metaReplica.getSegmentNum(segmentTypeSealed)),
zap.String("traceID", traceID),
)
if dNode.metaReplica.getSegmentNum(segmentTypeSealed) != 0 {
err := processDeleteMessages(dNode.metaReplica, segmentTypeSealed, delMsg, delData)
if err != nil {
// error occurs when missing meta info or unexpected pk type, should not happen
err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s, channel = %s", delMsg.CollectionID, err, dNode.channel)
err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s, channel = %s", delMsg.CollectionID, err, dNode.vchannel)
log.Error(err.Error())
panic(err)
}
@ -116,7 +116,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Warn("failed to get segment",
zap.Int64("collectionID", dNode.collectionID),
zap.Int64("segmentID", segmentID),
zap.String("channel", dNode.channel),
zap.String("vchannel", dNode.vchannel),
)
continue
}
@ -180,12 +180,15 @@ func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
return fmt.Errorf("segmentDelete failed, segmentID = %d, err=%w", segmentID, err)
}
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID), zap.Any("SegmentType", targetSegment.segmentType), zap.String("channel", dNode.channel))
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])),
zap.Int64("segmentID", segmentID),
zap.String("SegmentType", targetSegment.getType().String()),
zap.String("vchannel", dNode.vchannel))
return nil
}
// newDeleteNode returns a new deleteNode
func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, channel Channel) *deleteNode {
func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *deleteNode {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
@ -197,6 +200,6 @@ func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, channel
baseNode: baseNode,
collectionID: collectionID,
metaReplica: metaReplica,
channel: channel,
vchannel: vchannel,
}
}

View File

@ -35,12 +35,12 @@ type filterDeleteNode struct {
baseNode
collectionID UniqueID
metaReplica ReplicaInterface
channel Channel
vchannel Channel
}
// Name returns the name of filterDeleteNode
func (fddNode *filterDeleteNode) Name() string {
return fmt.Sprintf("fdNode-%s", fddNode.channel)
return fmt.Sprintf("fdNode-%s", fddNode.vchannel)
}
// Operate handles input messages, to filter invalid delete messages
@ -79,7 +79,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
collection, err := fddNode.metaReplica.getCollectionByID(fddNode.collectionID)
if err != nil {
// QueryNode should add collection before start flow graph
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel = %s", fddNode.Name(), fddNode.collectionID, fddNode.channel))
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel = %s", fddNode.Name(), fddNode.collectionID, fddNode.vchannel))
}
for _, msg := range msgStreamMsg.TsMessages() {
@ -88,7 +88,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
resMsg, err := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg), collection.getLoadType())
if err != nil {
// error occurs when missing meta info or data is misaligned, should not happen
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s, collection = %d, channel = %s", err, fddNode.collectionID, fddNode.channel)
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s, collection = %d, channel = %s", err, fddNode.collectionID, fddNode.vchannel)
log.Error(err.Error())
panic(err)
}
@ -99,7 +99,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Warn("invalid message type in filterDeleteNode",
zap.String("message type", msg.Type().String()),
zap.Int64("collection", fddNode.collectionID),
zap.String("channel", fddNode.channel))
zap.String("vchannel", fddNode.vchannel))
}
}
var res Msg = &dMsg
@ -125,9 +125,9 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.String("channel", fddNode.channel),
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
zap.String("vchannel", fddNode.vchannel),
zap.Int64("collectionID", msg.CollectionID),
zap.Int64("partitionID", msg.PartitionID))
return nil, nil
}
@ -141,7 +141,7 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
}
// newFilteredDeleteNode returns a new filterDeleteNode
func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, channel Channel) *filterDeleteNode {
func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *filterDeleteNode {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
@ -154,6 +154,6 @@ func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID,
baseNode: baseNode,
collectionID: collectionID,
metaReplica: metaReplica,
channel: channel,
vchannel: vchannel,
}
}

View File

@ -39,12 +39,12 @@ type filterDmNode struct {
baseNode
collectionID UniqueID
metaReplica ReplicaInterface
channel Channel
vchannel Channel
}
// Name returns the name of filterDmNode
func (fdmNode *filterDmNode) Name() string {
return fmt.Sprintf("fdmNode-%s", fdmNode.channel)
return fmt.Sprintf("fdmNode-%s", fdmNode.vchannel)
}
// Operate handles input messages, to filter invalid insert messages
@ -83,7 +83,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
collection, err := fdmNode.metaReplica.getCollectionByID(fdmNode.collectionID)
if err != nil {
// QueryNode should add collection before start flow graph
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel: %s", fdmNode.Name(), fdmNode.collectionID, fdmNode.channel))
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, vchannel: %s", fdmNode.Name(), fdmNode.collectionID, fdmNode.vchannel))
}
for i, msg := range msgStreamMsg.TsMessages() {
@ -95,7 +95,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if err != nil {
// error occurs when missing meta info or data is misaligned, should not happen
err = fmt.Errorf("filterInvalidInsertMessage failed, err = %s", err)
log.Error(err.Error(), zap.Int64("collection", fdmNode.collectionID), zap.String("channel", fdmNode.channel))
log.Error(err.Error(), zap.Int64("collection", fdmNode.collectionID), zap.String("vchannel", fdmNode.vchannel))
panic(err)
}
if resMsg != nil {
@ -108,7 +108,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if err != nil {
// error occurs when missing meta info or data is misaligned, should not happen
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err)
log.Error(err.Error(), zap.Int64("collection", fdmNode.collectionID), zap.String("channel", fdmNode.channel))
log.Error(err.Error(), zap.Int64("collection", fdmNode.collectionID), zap.String("vchannel", fdmNode.vchannel))
panic(err)
}
if resMsg != nil {
@ -120,7 +120,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Warn("invalid message type in filterDmNode",
zap.String("message type", msg.Type().String()),
zap.Int64("collection", fdmNode.collectionID),
zap.String("channel", fdmNode.channel))
zap.String("vchannel", fdmNode.vchannel))
}
}
@ -139,9 +139,9 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.String("channel", fdmNode.channel),
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
zap.String("vchannel", fdmNode.vchannel),
zap.Int64("collectionID", msg.CollectionID),
zap.Int64("partitionID", msg.PartitionID))
return nil, nil
}
@ -172,9 +172,9 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid insert message, no message",
zap.String("channel", fdmNode.channel),
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
zap.String("vchannel", fdmNode.vchannel),
zap.Int64("collectionID", msg.CollectionID),
zap.Int64("partitionID", msg.PartitionID))
return nil, nil
}
@ -185,8 +185,8 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
// check if the collection from message is target collection
if msg.CollectionID != fdmNode.collectionID {
//log.Debug("filter invalid insert message, collection is not the target collection",
// zap.Any("collectionID", msg.CollectionID),
// zap.Any("partitionID", msg.PartitionID))
// zap.Int64("collectionID", msg.CollectionID),
// zap.Int64("partitionID", msg.PartitionID))
return nil, nil
}
@ -209,7 +209,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
// unFlushed segment may not have checkPoint, so `segmentInfo.DmlPosition` may be nil
if segmentInfo.DmlPosition == nil {
log.Warn("filter unFlushed segment without checkPoint",
zap.String("channel", fdmNode.channel),
zap.String("vchannel", fdmNode.vchannel),
zap.Int64("collectionID", msg.CollectionID),
zap.Int64("partitionID", msg.PartitionID),
zap.Int64("segmentID", msg.SegmentID))
@ -217,7 +217,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
}
if msg.SegmentID == segmentInfo.ID && msg.EndTs() < segmentInfo.DmlPosition.Timestamp {
log.Debug("filter invalid insert message, segments are excluded segments",
zap.String("channel", fdmNode.channel),
zap.String("vchannel", fdmNode.vchannel),
zap.Int64("collectionID", msg.CollectionID),
zap.Int64("partitionID", msg.PartitionID),
zap.Int64("segmentID", msg.SegmentID))
@ -229,7 +229,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
}
// newFilteredDmNode returns a new filterDmNode
func newFilteredDmNode(metaReplica ReplicaInterface, collectionID UniqueID, channel Channel) *filterDmNode {
func newFilteredDmNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *filterDmNode {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
@ -242,6 +242,6 @@ func newFilteredDmNode(metaReplica ReplicaInterface, collectionID UniqueID, chan
baseNode: baseNode,
collectionID: collectionID,
metaReplica: metaReplica,
channel: channel,
vchannel: vchannel,
}
}

View File

@ -46,7 +46,7 @@ type insertNode struct {
baseNode
collectionID UniqueID
metaReplica ReplicaInterface // streaming
channel Channel
vchannel Channel
}
// insertData stores the valid insert data
@ -67,7 +67,7 @@ type deleteData struct {
// Name returns the name of insertNode
func (iNode *insertNode) Name() string {
return fmt.Sprintf("iNode-%s", iNode.channel)
return fmt.Sprintf("iNode-%s", iNode.vchannel)
}
// Operate handles input messages, to execute insert operations
@ -106,7 +106,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
collection, err := iNode.metaReplica.getCollectionByID(iNode.collectionID)
if err != nil {
// QueryNode should add collection before start flow graph
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel: %s", iNode.Name(), iNode.collectionID, iNode.channel))
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, vchannel: %s", iNode.Name(), iNode.collectionID, iNode.vchannel))
}
// 1. hash insertMessages to insertData
@ -122,7 +122,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if err != nil {
// error occurs only when collection cannot be found, should not happen
err = fmt.Errorf("insertNode addPartition failed, err = %s", err)
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("vchannel", iNode.vchannel))
panic(err)
}
}
@ -139,7 +139,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if err != nil {
// error occurs when collection or partition cannot be found, collection and partition should be created before
err = fmt.Errorf("insertNode addSegment failed, err = %s", err)
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel))
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("vchannel", iNode.vchannel))
panic(err)
}
}
@ -148,7 +148,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if err != nil {
// occurs only when schema doesn't have dim param, this should not happen
err = fmt.Errorf("failed to transfer msgStream.insertMsg to storage.InsertRecord, err = %s", err)
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel))
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("vchannel", iNode.vchannel))
panic(err)
}
@ -163,7 +163,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if err != nil {
// error occurs when cannot find collection or data is misaligned, should not happen
err = fmt.Errorf("failed to get primary keys, err = %d", err)
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel))
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("vchannel", iNode.vchannel))
panic(err)
}
iData.insertPKs[insertMsg.SegmentID] = append(iData.insertPKs[insertMsg.SegmentID], pks...)
@ -177,7 +177,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if err != nil {
// should not happen, segment should be created before
err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err)
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel))
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("vchannel", iNode.vchannel))
if !errors.Is(err, ErrSegmentNotFound) {
panic(err)
@ -194,11 +194,11 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// error occurs when cgo function `PreInsert` failed
err = fmt.Errorf("segmentPreInsert failed, segmentID = %d, err = %s", segmentID, err)
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel))
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("vchannel", iNode.vchannel))
panic(err)
}
iData.insertOffset[segmentID] = offset
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segmentID", segmentID), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel))
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segmentID", segmentID), zap.Int64("collectionID", iNode.collectionID), zap.String("vchannel", iNode.vchannel))
targetSegment.updateBloomFilter(iData.insertPKs[segmentID])
}
}
@ -214,7 +214,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if err != nil {
// error occurs when segment cannot be found or cgo function `Insert` failed
err = fmt.Errorf("segment insert failed, segmentID = %d, err = %s", segmentID, err)
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("vchannel", iNode.vchannel))
panic(err)
}
}()
@ -230,14 +230,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for _, delMsg := range iMsg.deleteMessages {
if iNode.metaReplica.getSegmentNum(segmentTypeGrowing) != 0 {
log.Debug("delete in streaming replica",
zap.String("channel", iNode.channel),
zap.Any("collectionID", delMsg.CollectionID),
zap.Any("collectionName", delMsg.CollectionName),
zap.String("vchannel", iNode.vchannel),
zap.Int64("collectionID", delMsg.CollectionID),
zap.String("collectionName", delMsg.CollectionName),
zap.Int64("numPKs", delMsg.NumRows))
err := processDeleteMessages(iNode.metaReplica, segmentTypeGrowing, delMsg, delData)
if err != nil {
// error occurs when missing meta info or unexpected pk type, should not happen
err = fmt.Errorf("insertNode processDeleteMessages failed, collectionID = %d, err = %s, channel: %s", delMsg.CollectionID, err, iNode.channel)
err = fmt.Errorf("insertNode processDeleteMessages failed, collectionID = %d, err = %s, vchannel: %s", delMsg.CollectionID, err, iNode.vchannel)
log.Error(err.Error())
panic(err)
}
@ -550,7 +550,7 @@ func getPKsFromColumnBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.C
}
// newInsertNode returns a new insertNode
func newInsertNode(metaReplica ReplicaInterface, collectionID UniqueID, channel Channel) *insertNode {
func newInsertNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *insertNode {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
@ -562,6 +562,6 @@ func newInsertNode(metaReplica ReplicaInterface, collectionID UniqueID, channel
baseNode: baseNode,
collectionID: collectionID,
metaReplica: metaReplica,
channel: channel,
vchannel: vchannel,
}
}

View File

@ -1,28 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import "github.com/milvus-io/milvus/internal/util/flowgraph"
// baseNode is type flowgraph.BaseNode
type baseNode = flowgraph.BaseNode
// node is type flowgraph.Node
type node = flowgraph.Node
// inputNode is type flowgraph.InputNode
type inputNode = flowgraph.InputNode

View File

@ -33,12 +33,20 @@ import (
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
type (
// baseNode is type flowgraph.BaseNode
baseNode = flowgraph.BaseNode
// node is type flowgraph.Node
node = flowgraph.Node
)
// queryNodeFlowGraph is a TimeTickedFlowGraph in query node
type queryNodeFlowGraph struct {
ctx context.Context
cancel context.CancelFunc
collectionID UniqueID
channel Channel
vchannel Channel
flowGraph *flowgraph.TimeTickedFlowGraph
dmlStream msgstream.MsgStream
consumerCnt int
@ -49,7 +57,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
collectionID UniqueID,
metaReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
channel Channel,
vchannel Channel,
factory msgstream.Factory) (*queryNodeFlowGraph, error) {
ctx1, cancel := context.WithCancel(ctx)
@ -58,17 +66,17 @@ func newQueryNodeFlowGraph(ctx context.Context,
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
channel: channel,
vchannel: vchannel,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
}
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, channel)
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel)
if err != nil {
return nil, err
}
var filterDmNode node = newFilteredDmNode(metaReplica, collectionID, channel)
var insertNode node = newInsertNode(metaReplica, collectionID, channel)
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel)
var filterDmNode node = newFilteredDmNode(metaReplica, collectionID, vchannel)
var insertNode node = newInsertNode(metaReplica, collectionID, vchannel)
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, vchannel)
q.flowGraph.AddNode(dmStreamNode)
q.flowGraph.AddNode(filterDmNode)
@ -115,7 +123,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
collectionID UniqueID,
metaReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
channel Channel,
vchannel Channel,
factory msgstream.Factory) (*queryNodeFlowGraph, error) {
ctx1, cancel := context.WithCancel(ctx)
@ -124,17 +132,17 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
channel: channel,
vchannel: vchannel,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
}
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, channel)
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel)
if err != nil {
return nil, err
}
var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID, channel)
var deleteNode node = newDeleteNode(metaReplica, collectionID, channel)
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel)
var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID, vchannel)
var deleteNode node = newDeleteNode(metaReplica, collectionID, vchannel)
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, vchannel)
q.flowGraph.AddNode(dmStreamNode)
q.flowGraph.AddNode(filterDeleteNode)
@ -177,7 +185,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
}
// newDmInputNode returns a new inputNode
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory, collectionID UniqueID, channel Channel) (*flowgraph.InputNode, error) {
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory, collectionID UniqueID, vchannel Channel) (*flowgraph.InputNode, error) {
insertStream, err := factory.NewTtMsgStream(ctx)
if err != nil {
return nil, err
@ -187,7 +195,7 @@ func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstre
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
name := fmt.Sprintf("dmInputNode-query-%d-%s", collectionID, channel)
name := fmt.Sprintf("dmInputNode-query-%d-%s", collectionID, vchannel)
node := flowgraph.NewInputNode(insertStream, name, maxQueueLength, maxParallelism)
return node, nil
}
@ -198,10 +206,11 @@ func (q *queryNodeFlowGraph) consumeFlowGraph(channel Channel, subName ConsumeSu
return errors.New("null dml message stream in flow graph")
}
q.dmlStream.AsConsumer([]string{channel}, subName, mqwrapper.SubscriptionPositionUnknown)
log.Info("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
zap.Any("subName", subName),
log.Info("query node flow graph consumes from PositionUnknown",
zap.Int64("collectionID", q.collectionID),
zap.String("pchannel", channel),
zap.String("vchannel", q.vchannel),
zap.String("subName", subName),
)
q.consumerCnt++
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc()
@ -214,10 +223,11 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName
return errors.New("null dml message stream in flow graph")
}
q.dmlStream.AsConsumer([]string{channel}, subName, mqwrapper.SubscriptionPositionLatest)
log.Info("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
zap.Any("subName", subName),
log.Info("query node flow graph consumes from latest",
zap.Int64("collectionID", q.collectionID),
zap.String("pchannel", channel),
zap.String("vchannel", q.vchannel),
zap.String("subName", subName),
)
q.consumerCnt++
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc()
@ -227,14 +237,18 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName
// seekQueryNodeFlowGraph would seek by position
func (q *queryNodeFlowGraph) consumeFlowGraphFromPosition(position *internalpb.MsgPosition) error {
q.dmlStream.AsConsumer([]string{position.ChannelName}, position.MsgGroup, mqwrapper.SubscriptionPositionUnknown)
start := time.Now()
err := q.dmlStream.Seek([]*internalpb.MsgPosition{position})
ts, _ := tsoutil.ParseTS(position.GetTimestamp())
log.Info("query node flow graph seeks from pChannel",
log.Info("query node flow graph seeks from position",
zap.Int64("collectionID", q.collectionID),
zap.String("channel", position.ChannelName),
zap.String("pchannel", position.ChannelName),
zap.String("vchannel", q.vchannel),
zap.Time("checkpointTs", ts),
zap.Duration("tsLag", time.Since(ts)),
zap.Duration("elapse", time.Since(start)),
)
q.consumerCnt++
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc()
@ -249,7 +263,7 @@ func (q *queryNodeFlowGraph) close() {
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(q.consumerCnt))
}
log.Info("stop query node flow graph",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", q.channel),
zap.Int64("collectionID", q.collectionID),
zap.String("vchannel", q.vchannel),
)
}

View File

@ -19,6 +19,7 @@ package querynode
import (
"fmt"
"reflect"
"time"
"go.uber.org/zap"
@ -67,10 +68,11 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
rateCol.updateTSafe(stNode.vChannel, serviceTimeMsg.timeRange.timestampMax)
p, _ := tsoutil.ParseTS(serviceTimeMsg.timeRange.timestampMax)
log.RatedDebug(10.0, "update tSafe:",
zap.Any("collectionID", stNode.collectionID),
zap.Any("tSafe", serviceTimeMsg.timeRange.timestampMax),
zap.Any("tSafe_p", p),
zap.Any("channel", stNode.vChannel),
zap.Int64("collectionID", stNode.collectionID),
zap.Uint64("tSafe", serviceTimeMsg.timeRange.timestampMax),
zap.Time("tSafe_p", p),
zap.Duration("tsLag", time.Since(p)),
zap.String("channel", stNode.vChannel),
)
return in
@ -79,7 +81,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// newServiceTimeNode returns a new serviceTimeNode
func newServiceTimeNode(tSafeReplica TSafeReplicaInterface,
collectionID UniqueID,
channel Channel) *serviceTimeNode {
vchannel Channel) *serviceTimeNode {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
@ -91,7 +93,7 @@ func newServiceTimeNode(tSafeReplica TSafeReplicaInterface,
return &serviceTimeNode{
baseNode: baseNode,
collectionID: collectionID,
vChannel: channel,
vChannel: vchannel,
tSafeReplica: tSafeReplica,
}
}