mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
f1dd55e0c0
- issue: #30301 Signed-off-by: SimFG <bang.fu@zilliz.com>
194 lines
3.8 KiB
Go
194 lines
3.8 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rootcoord
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
|
)
|
|
|
|
type LockLevel int
|
|
|
|
const (
|
|
ClusterLock LockLevel = iota
|
|
DatabaseLock
|
|
CollectionLock
|
|
)
|
|
|
|
type LockerKey interface {
|
|
LockKey() string
|
|
Level() LockLevel
|
|
IsWLock() bool
|
|
Next() LockerKey
|
|
}
|
|
|
|
type task interface {
|
|
GetCtx() context.Context
|
|
SetCtx(context.Context)
|
|
SetTs(ts Timestamp)
|
|
GetTs() Timestamp
|
|
SetID(id UniqueID)
|
|
GetID() UniqueID
|
|
Prepare(ctx context.Context) error
|
|
Execute(ctx context.Context) error
|
|
WaitToFinish() error
|
|
NotifyDone(err error)
|
|
SetInQueueDuration()
|
|
GetLockerKey() LockerKey
|
|
}
|
|
|
|
type baseTask struct {
|
|
ctx context.Context
|
|
core *Core
|
|
done chan error
|
|
ts Timestamp
|
|
id UniqueID
|
|
|
|
tr *timerecord.TimeRecorder
|
|
queueDur time.Duration
|
|
}
|
|
|
|
func newBaseTask(ctx context.Context, core *Core) baseTask {
|
|
b := baseTask{
|
|
core: core,
|
|
done: make(chan error, 1),
|
|
tr: timerecord.NewTimeRecorderWithTrace(ctx, "new task"),
|
|
}
|
|
b.SetCtx(ctx)
|
|
return b
|
|
}
|
|
|
|
func (b *baseTask) SetCtx(ctx context.Context) {
|
|
b.ctx = ctx
|
|
}
|
|
|
|
func (b *baseTask) GetCtx() context.Context {
|
|
return b.ctx
|
|
}
|
|
|
|
func (b *baseTask) SetTs(ts Timestamp) {
|
|
b.ts = ts
|
|
}
|
|
|
|
func (b *baseTask) GetTs() Timestamp {
|
|
return b.ts
|
|
}
|
|
|
|
func (b *baseTask) SetID(id UniqueID) {
|
|
b.id = id
|
|
}
|
|
|
|
func (b *baseTask) GetID() UniqueID {
|
|
return b.id
|
|
}
|
|
|
|
func (b *baseTask) Prepare(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (b *baseTask) Execute(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (b *baseTask) WaitToFinish() error {
|
|
return <-b.done
|
|
}
|
|
|
|
func (b *baseTask) NotifyDone(err error) {
|
|
b.done <- err
|
|
}
|
|
|
|
func (b *baseTask) SetInQueueDuration() {
|
|
b.queueDur = b.tr.ElapseSpan()
|
|
}
|
|
|
|
func (b *baseTask) GetLockerKey() LockerKey {
|
|
return nil
|
|
}
|
|
|
|
type taskLockerKey struct {
|
|
key string
|
|
rw bool
|
|
level LockLevel
|
|
next LockerKey
|
|
}
|
|
|
|
func (t *taskLockerKey) LockKey() string {
|
|
return t.key
|
|
}
|
|
|
|
func (t *taskLockerKey) Level() LockLevel {
|
|
return t.level
|
|
}
|
|
|
|
func (t *taskLockerKey) IsWLock() bool {
|
|
return t.rw
|
|
}
|
|
|
|
func (t *taskLockerKey) Next() LockerKey {
|
|
return t.next
|
|
}
|
|
|
|
func NewClusterLockerKey(rw bool) LockerKey {
|
|
return &taskLockerKey{
|
|
key: "$",
|
|
rw: rw,
|
|
level: ClusterLock,
|
|
}
|
|
}
|
|
|
|
func NewDatabaseLockerKey(db string, rw bool) LockerKey {
|
|
return &taskLockerKey{
|
|
key: db,
|
|
rw: rw,
|
|
level: DatabaseLock,
|
|
}
|
|
}
|
|
|
|
func NewCollectionLockerKey(collection string, rw bool) LockerKey {
|
|
return &taskLockerKey{
|
|
key: collection,
|
|
rw: rw,
|
|
level: CollectionLock,
|
|
}
|
|
}
|
|
|
|
func NewLockerKeyChain(lockerKeys ...LockerKey) LockerKey {
|
|
log.Info("NewLockerKeyChain", zap.Any("lockerKeys", len(lockerKeys)))
|
|
if len(lockerKeys) == 0 {
|
|
return nil
|
|
}
|
|
if lockerKeys[0] == nil || lockerKeys[0].Level() != ClusterLock {
|
|
log.Warn("Invalid locker key chain", zap.Stack("stack"))
|
|
return nil
|
|
}
|
|
|
|
for i := 0; i < len(lockerKeys)-1; i++ {
|
|
if lockerKeys[i] == nil || lockerKeys[i].Level() >= lockerKeys[i+1].Level() {
|
|
log.Warn("Invalid locker key chain", zap.Stack("stack"))
|
|
return nil
|
|
}
|
|
lockerKeys[i].(*taskLockerKey).next = lockerKeys[i+1]
|
|
}
|
|
return lockerKeys[0]
|
|
}
|