Setup first tsafe to avoid false maxLag error (#21490)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-01-03 20:41:33 +08:00 committed by GitHub
parent 72184eaeaf
commit 2844f9b1e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -46,12 +46,11 @@ type (
// queryNodeFlowGraph is a TimeTickedFlowGraph in query node
type queryNodeFlowGraph struct {
ctx context.Context
cancel context.CancelFunc
collectionID UniqueID
vchannel Channel
flowGraph *flowgraph.TimeTickedFlowGraph
dmlStream msgstream.MsgStream
tSafeReplica TSafeReplicaInterface
consumerCnt int
}
@ -63,17 +62,14 @@ func newQueryNodeFlowGraph(ctx context.Context,
vchannel Channel,
factory msgstream.Factory) (*queryNodeFlowGraph, error) {
ctx1, cancel := context.WithCancel(ctx)
q := &queryNodeFlowGraph{
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
vchannel: vchannel,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
tSafeReplica: tSafeReplica,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx),
}
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.InsertLabel)
dmStreamNode, err := q.newDmInputNode(ctx, factory, collectionID, vchannel, metrics.InsertLabel)
if err != nil {
return nil, err
}
@ -129,17 +125,14 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
vchannel Channel,
factory msgstream.Factory) (*queryNodeFlowGraph, error) {
ctx1, cancel := context.WithCancel(ctx)
q := &queryNodeFlowGraph{
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
vchannel: vchannel,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
tSafeReplica: tSafeReplica,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx),
}
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.DeleteLabel)
dmStreamNode, err := q.newDmInputNode(ctx, factory, collectionID, vchannel, metrics.DeleteLabel)
if err != nil {
return nil, err
}
@ -248,6 +241,8 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromPosition(position *internalpb.M
start := time.Now()
err := q.dmlStream.Seek([]*internalpb.MsgPosition{position})
// setup first ts
q.tSafeReplica.setTSafe(q.vchannel, position.GetTimestamp())
ts, _ := tsoutil.ParseTS(position.GetTimestamp())
log.Info("query node flow graph seeks from position",
@ -265,7 +260,6 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromPosition(position *internalpb.M
// close would close queryNodeFlowGraph
func (q *queryNodeFlowGraph) close() {
q.cancel()
q.flowGraph.Close()
if q.dmlStream != nil && q.consumerCnt > 0 {
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Sub(float64(q.consumerCnt))