mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
Fix time tick lag too much (#23265)
Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
parent
9288020da3
commit
d85f673a95
@ -128,7 +128,9 @@ func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Fact
|
||||
|
||||
lock: sync.Mutex{},
|
||||
sess2ChanTsMap: make(map[typeutil.UniqueID]*chanTsMsg),
|
||||
sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 16),
|
||||
|
||||
// 1 is the most reasonable capacity. In fact, Milvus can only focus on the latest time tick.
|
||||
sendChan: make(chan map[typeutil.UniqueID]*chanTsMsg, 1),
|
||||
|
||||
syncedTtHistogram: newTtHistogram(),
|
||||
}
|
||||
@ -169,7 +171,17 @@ func (t *timetickSync) sendToChannel() bool {
|
||||
ptt[k] = v
|
||||
t.sess2ChanTsMap[k] = nil
|
||||
}
|
||||
t.sendChan <- ptt
|
||||
|
||||
select {
|
||||
case t.sendChan <- ptt:
|
||||
default:
|
||||
// The consumer of `sendChan` haven't completed its operation. If we send the `ptt` here, the consumer will
|
||||
// always get an older time tick. The older time tick in `sendChan` will block newer time tick in next window.
|
||||
// However, in fact the consumer can only focus on the newest.
|
||||
|
||||
// TODO: maybe a metric should be here.
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user