fix: Make target observer auto/manual task mutual exclusive (#31584)

See also #30867

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-03-26 09:57:08 +08:00 committed by GitHub
parent 9243e6087e
commit 73858b23bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -68,6 +69,7 @@ type TargetObserver struct {
readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers
dispatcher *taskDispatcher[int64]
keylocks *lock.KeyLock[int64]
stopOnce sync.Once
}
@ -90,6 +92,7 @@ func NewTargetObserver(
updateChan: make(chan targetUpdateRequest),
readyNotifiers: make(map[int64][]chan struct{}),
initChan: make(chan initRequest),
keylocks: lock.NewKeyLock[int64](),
}
dispatcher := newTaskDispatcher(result.check)
@ -148,7 +151,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
case req := <-ob.updateChan:
log := log.With(zap.Int64("collectionID", req.CollectionID))
log.Info("manually trigger update next target")
ob.keylocks.Lock(req.CollectionID)
err := ob.updateNextTarget(req.CollectionID)
ob.keylocks.Unlock(req.CollectionID)
if err != nil {
log.Warn("failed to manually update next target", zap.Error(err))
close(req.ReadyNotifier)
@ -184,6 +189,9 @@ func (ob *TargetObserver) check(ctx context.Context, collectionID int64) {
return
}
ob.keylocks.Lock(collectionID)
defer ob.keylocks.Unlock(collectionID)
if ob.shouldUpdateCurrentTarget(ctx, collectionID) {
ob.updateCurrentTarget(collectionID)
}