FixBug:update serviceableTime incorrectly (#11769)

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2021-11-15 10:31:09 +08:00 committed by GitHub
parent 902e856a06
commit 8c6c031e80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -62,7 +62,7 @@ type queryCollection struct {
tSafeUpdate bool
watcherCond *sync.Cond
serviceableTimeMutex sync.Mutex // guards serviceableTime
serviceableTimeMutex sync.RWMutex // guards serviceableTime
serviceableTime Timestamp
queryMsgStream msgstream.MsgStream
@ -252,9 +252,14 @@ func (q *queryCollection) waitNewTSafe() (Timestamp, error) {
}
func (q *queryCollection) getServiceableTime() Timestamp {
q.serviceableTimeMutex.Lock()
defer q.serviceableTimeMutex.Unlock()
return q.serviceableTime
gracefulTimeInMilliSecond := Params.GracefulTime
gracefulTime := typeutil.ZeroTimestamp
if gracefulTimeInMilliSecond > 0 {
gracefulTime = tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0)
}
q.serviceableTimeMutex.RLock()
defer q.serviceableTimeMutex.RUnlock()
return q.serviceableTime + gracefulTime
}
func (q *queryCollection) setServiceableTime(t Timestamp) {
@ -264,14 +269,7 @@ func (q *queryCollection) setServiceableTime(t Timestamp) {
if t < q.serviceableTime {
return
}
gracefulTimeInMilliSecond := Params.GracefulTime
if gracefulTimeInMilliSecond > 0 {
gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0)
q.serviceableTime = t + gracefulTime
} else {
q.serviceableTime = t
}
q.serviceableTime = t
}
func (q *queryCollection) consumeQuery() {