mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 11:29:48 +08:00
370fdaf50d
Signed-off-by: longjiquan <jiquan.long@zilliz.com>
147 lines
3.7 KiB
Go
147 lines
3.7 KiB
Go
package indexnode
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
|
"github.com/milvus-io/milvus/pkg/common"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
)
|
|
|
|
func (i *IndexNode) loadOrStoreTask(ClusterID string, buildID UniqueID, info *taskInfo) *taskInfo {
|
|
i.stateLock.Lock()
|
|
defer i.stateLock.Unlock()
|
|
key := taskKey{ClusterID: ClusterID, BuildID: buildID}
|
|
oldInfo, ok := i.tasks[key]
|
|
if ok {
|
|
return oldInfo
|
|
}
|
|
i.tasks[key] = info
|
|
return nil
|
|
}
|
|
|
|
func (i *IndexNode) loadTaskState(ClusterID string, buildID UniqueID) commonpb.IndexState {
|
|
key := taskKey{ClusterID: ClusterID, BuildID: buildID}
|
|
i.stateLock.Lock()
|
|
defer i.stateLock.Unlock()
|
|
task, ok := i.tasks[key]
|
|
if !ok {
|
|
return commonpb.IndexState_IndexStateNone
|
|
}
|
|
return task.state
|
|
}
|
|
|
|
func (i *IndexNode) storeTaskState(ClusterID string, buildID UniqueID, state commonpb.IndexState, failReason string) {
|
|
key := taskKey{ClusterID: ClusterID, BuildID: buildID}
|
|
i.stateLock.Lock()
|
|
defer i.stateLock.Unlock()
|
|
if task, ok := i.tasks[key]; ok {
|
|
log.Debug("IndexNode store task state", zap.String("clusterID", ClusterID), zap.Int64("buildID", buildID),
|
|
zap.String("state", state.String()), zap.String("fail reason", failReason))
|
|
task.state = state
|
|
task.failReason = failReason
|
|
}
|
|
}
|
|
|
|
func (i *IndexNode) foreachTaskInfo(fn func(ClusterID string, buildID UniqueID, info *taskInfo)) {
|
|
i.stateLock.Lock()
|
|
defer i.stateLock.Unlock()
|
|
for key, info := range i.tasks {
|
|
fn(key.ClusterID, key.BuildID, info)
|
|
}
|
|
}
|
|
|
|
func (i *IndexNode) storeIndexFilesAndStatistic(
|
|
ClusterID string,
|
|
buildID UniqueID,
|
|
fileKeys []string,
|
|
serializedSize uint64,
|
|
statistic *indexpb.JobInfo,
|
|
currentIndexVersion int32,
|
|
) {
|
|
key := taskKey{ClusterID: ClusterID, BuildID: buildID}
|
|
i.stateLock.Lock()
|
|
defer i.stateLock.Unlock()
|
|
if info, ok := i.tasks[key]; ok {
|
|
info.fileKeys = common.CloneStringList(fileKeys)
|
|
info.serializedSize = serializedSize
|
|
info.statistic = proto.Clone(statistic).(*indexpb.JobInfo)
|
|
info.currentIndexVersion = currentIndexVersion
|
|
return
|
|
}
|
|
}
|
|
|
|
func (i *IndexNode) deleteTaskInfos(ctx context.Context, keys []taskKey) []*taskInfo {
|
|
i.stateLock.Lock()
|
|
defer i.stateLock.Unlock()
|
|
deleted := make([]*taskInfo, 0, len(keys))
|
|
for _, key := range keys {
|
|
info, ok := i.tasks[key]
|
|
if ok {
|
|
deleted = append(deleted, info)
|
|
delete(i.tasks, key)
|
|
log.Ctx(ctx).Info("delete task infos",
|
|
zap.String("cluster_id", key.ClusterID), zap.Int64("build_id", key.BuildID))
|
|
}
|
|
}
|
|
return deleted
|
|
}
|
|
|
|
func (i *IndexNode) deleteAllTasks() []*taskInfo {
|
|
i.stateLock.Lock()
|
|
deletedTasks := i.tasks
|
|
i.tasks = make(map[taskKey]*taskInfo)
|
|
i.stateLock.Unlock()
|
|
|
|
deleted := make([]*taskInfo, 0, len(deletedTasks))
|
|
for _, info := range deletedTasks {
|
|
deleted = append(deleted, info)
|
|
}
|
|
return deleted
|
|
}
|
|
|
|
func (i *IndexNode) hasInProgressTask() bool {
|
|
i.stateLock.Lock()
|
|
defer i.stateLock.Unlock()
|
|
for _, info := range i.tasks {
|
|
if info.state == commonpb.IndexState_InProgress {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (i *IndexNode) waitTaskFinish() {
|
|
if !i.hasInProgressTask() {
|
|
return
|
|
}
|
|
|
|
gracefulTimeout := &Params.IndexNodeCfg.GracefulStopTimeout
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
timeoutCtx, cancel := context.WithTimeout(i.loopCtx, gracefulTimeout.GetAsDuration(time.Second))
|
|
defer cancel()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if !i.hasInProgressTask() {
|
|
return
|
|
}
|
|
case <-timeoutCtx.Done():
|
|
log.Warn("timeout, the index node has some progress task")
|
|
for _, info := range i.tasks {
|
|
if info.state == commonpb.IndexState_InProgress {
|
|
log.Warn("progress task", zap.Any("info", info))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|