Refactor time tick align strategy

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-02-27 18:33:29 +08:00 committed by yefu.chen
parent f0a0ae433f
commit 456d0e8c23

View File

@ -147,29 +147,33 @@ func (nodeCtx *nodeCtx) collectInputMessages() context.Context {
for i := 1; i < len(nodeCtx.inputMessages); i++ {
if t < nodeCtx.inputMessages[i].TimeTick() {
latestTime = nodeCtx.inputMessages[i].TimeTick()
//err := errors.New("Fatal, misaligned time tick," +
// "t1=" + strconv.FormatUint(time, 10) +
// ", t2=" + strconv.FormatUint((*nodeCtx.inputMessages[i]).TimeTick(), 10) +
// ", please restart pulsar")
//panic(err)
}
}
// wait for time tick
for i := 0; i < len(nodeCtx.inputMessages); i++ {
for nodeCtx.inputMessages[i].TimeTick() != latestTime {
channel := nodeCtx.inputChannels[i]
select {
case <-time.After(10 * time.Second):
panic("cannot find time tick in flow graph")
case msg, ok := <-channel:
sign := make(chan struct{})
go func() {
for i := 0; i < len(nodeCtx.inputMessages); i++ {
for nodeCtx.inputMessages[i].TimeTick() != latestTime {
fmt.Println("try to align timestamp, t1 =", latestTime, ", t2 =", nodeCtx.inputMessages[i].TimeTick())
channel := nodeCtx.inputChannels[i]
msg, ok := <-channel
if !ok {
log.Println("input channel closed")
return nil
return
}
nodeCtx.inputMessages[i] = msg.msg
}
}
sign <- struct{}{}
}()
select {
case <-time.After(10 * time.Second):
panic("Fatal, misaligned time tick, please restart pulsar")
case <-sign:
}
}
return ctx
}