mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
IndexNode still writes finished state to ETCD when task was deleted (#17761)
Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
b883d1f5dd
commit
899a9bf86d
@ -548,12 +548,14 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques
|
||||
ret := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}
|
||||
nodeTasks, err := i.metaTable.MarkIndexAsDeleted(req.IndexID)
|
||||
defer func() {
|
||||
for nodeID, taskNum := range nodeTasks {
|
||||
i.nodeManager.pq.IncPriority(nodeID, taskNum*-1)
|
||||
}
|
||||
}()
|
||||
err := i.metaTable.MarkIndexAsDeleted(req.IndexID)
|
||||
//no need do this. IndexNode finds that the task has been deleted, still changes the task status to finished, and writes back to etcd
|
||||
//nodeTasks, err := i.metaTable.MarkIndexAsDeleted(req.IndexID)
|
||||
//defer func() {
|
||||
// for nodeID, taskNum := range nodeTasks {
|
||||
// i.nodeManager.pq.IncPriority(nodeID, taskNum*-1)
|
||||
// }
|
||||
//}()
|
||||
if err != nil {
|
||||
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
ret.Reason = err.Error()
|
||||
@ -592,12 +594,13 @@ func (i *IndexCoord) RemoveIndex(ctx context.Context, req *indexpb.RemoveIndexRe
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}
|
||||
|
||||
nodeTasks, err := i.metaTable.MarkIndexAsDeletedByBuildIDs(req.GetBuildIDs())
|
||||
defer func() {
|
||||
for nodeID, taskNum := range nodeTasks {
|
||||
i.nodeManager.pq.IncPriority(nodeID, -1*taskNum)
|
||||
}
|
||||
}()
|
||||
err := i.metaTable.MarkIndexAsDeletedByBuildIDs(req.GetBuildIDs())
|
||||
// no need do this. IndexNode finds that the task has been deleted, still changes the task status to finished, and writes back to etcd
|
||||
//defer func() {
|
||||
// for nodeID, taskNum := range nodeTasks {
|
||||
// i.nodeManager.pq.IncPriority(nodeID, -1*taskNum)
|
||||
// }
|
||||
//}()
|
||||
if err != nil {
|
||||
log.Error("IndexCoord MarkIndexAsDeletedByBuildIDs failed", zap.Int64s("buildIDs", req.GetBuildIDs()),
|
||||
zap.Error(err))
|
||||
@ -903,8 +906,8 @@ func (i *IndexCoord) watchMetaLoop() {
|
||||
reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision)
|
||||
log.Debug("IndexCoord watchMetaLoop PUT", zap.Int64("IndexBuildID", indexBuildID), zap.Bool("reload", reload))
|
||||
if reload {
|
||||
log.Debug("This task has finished", zap.Int64("indexBuildID", indexBuildID),
|
||||
zap.Int64("Finish by IndexNode", indexMeta.NodeID),
|
||||
log.Debug("This task has finished or failed", zap.Int64("indexBuildID", indexBuildID),
|
||||
zap.Int64("Finish by IndexNode", indexMeta.NodeID), zap.String("index state", indexMeta.GetState().String()),
|
||||
zap.Int64("The version of the task", indexMeta.Version))
|
||||
if err = i.tryReleaseSegmentReferLock(ctx, indexBuildID, []UniqueID{indexMeta.Req.SegmentID}); err != nil {
|
||||
panic(err)
|
||||
|
@ -254,16 +254,17 @@ func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error {
|
||||
}
|
||||
|
||||
// MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks.
|
||||
func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) (map[int64]int, error) {
|
||||
func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
log.Debug("IndexCoord metaTable MarkIndexAsDeleted ", zap.Int64("indexID", indexID))
|
||||
|
||||
node2TaskNum := make(map[int64]int)
|
||||
for _, meta := range mt.indexBuildID2Meta {
|
||||
for buildID, meta := range mt.indexBuildID2Meta {
|
||||
if meta.indexMeta.Req.IndexID == indexID && !meta.indexMeta.MarkDeleted {
|
||||
meta.indexMeta.MarkDeleted = true
|
||||
log.Debug("IndexCoord metaTable MarkIndexAsDeleted ", zap.Int64("indexID", indexID),
|
||||
zap.Int64("buildID", buildID))
|
||||
// marshal inside
|
||||
/* #nosec G601 */
|
||||
if err := mt.saveIndexMeta(&meta); err != nil {
|
||||
@ -279,24 +280,21 @@ func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) (map[int64]int, error)
|
||||
}
|
||||
err2 := retry.Do(context.TODO(), fn, retry.Attempts(5))
|
||||
if err2 != nil {
|
||||
return node2TaskNum, err2
|
||||
return err2
|
||||
}
|
||||
} else {
|
||||
node2TaskNum[meta.indexMeta.NodeID]++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return node2TaskNum, nil
|
||||
log.Debug("IndexCoord metaTable MarkIndexAsDeleted success", zap.Int64("indexID", indexID))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mt *metaTable) MarkIndexAsDeletedByBuildIDs(buildIDs []UniqueID) (map[int64]int, error) {
|
||||
func (mt *metaTable) MarkIndexAsDeletedByBuildIDs(buildIDs []UniqueID) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
log.Debug("IndexCoord metaTable MarkIndexAsDeletedByBuildIDs ", zap.Int64s("buildIDs", buildIDs))
|
||||
|
||||
node2TaskNum := make(map[int64]int)
|
||||
for _, buildID := range buildIDs {
|
||||
if meta, ok := mt.indexBuildID2Meta[buildID]; ok {
|
||||
clonedMeta := &Meta{
|
||||
@ -319,13 +317,14 @@ func (mt *metaTable) MarkIndexAsDeletedByBuildIDs(buildIDs []UniqueID) (map[int6
|
||||
}
|
||||
err2 := retry.Do(context.TODO(), fn, retry.Attempts(5))
|
||||
if err2 != nil {
|
||||
return node2TaskNum, err2
|
||||
return err2
|
||||
}
|
||||
}
|
||||
node2TaskNum[meta.indexMeta.NodeID]++
|
||||
}
|
||||
}
|
||||
return node2TaskNum, nil
|
||||
|
||||
log.Debug("IndexCoord metaTable MarkIndexAsDeletedByBuildIDs success", zap.Int64s("buildIDs", buildIDs))
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetIndexStates gets the index states from meta table.
|
||||
@ -442,6 +441,9 @@ func (mt *metaTable) GetUnusedIndexFiles(limit int) []Meta {
|
||||
if meta.indexMeta.State == commonpb.IndexState_Finished && (meta.indexMeta.MarkDeleted || !meta.indexMeta.Recycled) {
|
||||
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
|
||||
}
|
||||
if meta.indexMeta.State == commonpb.IndexState_Unissued && meta.indexMeta.MarkDeleted {
|
||||
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
|
||||
}
|
||||
if len(metas) >= limit {
|
||||
return metas
|
||||
}
|
||||
|
@ -135,7 +135,7 @@ func TestMetaTable(t *testing.T) {
|
||||
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
|
||||
err = etcdKV.Save(key, string(value))
|
||||
assert.Nil(t, err)
|
||||
_, err = metaTable.MarkIndexAsDeleted(indexMeta1.Req.IndexID)
|
||||
err = metaTable.MarkIndexAsDeleted(indexMeta1.Req.IndexID)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
@ -153,7 +153,7 @@ func TestMetaTable(t *testing.T) {
|
||||
key = path.Join(indexFilePrefix, strconv.FormatInt(indexMeta1.IndexBuildID, 10))
|
||||
err = etcdKV.Save(key, string(value))
|
||||
assert.Nil(t, err)
|
||||
_, err = metaTable.MarkIndexAsDeletedByBuildIDs([]UniqueID{indexMeta1.IndexBuildID})
|
||||
err = metaTable.MarkIndexAsDeletedByBuildIDs([]UniqueID{indexMeta1.IndexBuildID})
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
|
@ -200,7 +200,7 @@ func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta) TaskStat
|
||||
if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
|
||||
it.SetState(TaskStateAbandon)
|
||||
} else if indexMeta.MarkDeleted {
|
||||
it.SetState(TaskStateAbandon)
|
||||
it.SetState(TaskStateDeleted)
|
||||
}
|
||||
return it.GetState()
|
||||
}
|
||||
@ -224,16 +224,12 @@ func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error {
|
||||
}
|
||||
|
||||
taskState := it.updateTaskState(indexMeta)
|
||||
if taskState == TaskStateAbandon {
|
||||
|
||||
if taskState == TaskStateDeleted {
|
||||
log.Info("IndexNode IndexBuildTask saveIndexMeta", zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
|
||||
return nil
|
||||
}
|
||||
|
||||
indexMeta.IndexFilePaths = it.savePaths
|
||||
indexMeta.SerializeSize = it.serializedSize
|
||||
|
||||
if taskState == TaskStateFailed {
|
||||
indexMeta.State = commonpb.IndexState_Finished
|
||||
} else if taskState == TaskStateFailed {
|
||||
log.Error("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Failed",
|
||||
zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.err))
|
||||
@ -245,6 +241,8 @@ func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error {
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.internalErr))
|
||||
indexMeta.State = commonpb.IndexState_Unissued
|
||||
} else { // TaskStateNormal
|
||||
indexMeta.IndexFilePaths = it.savePaths
|
||||
indexMeta.SerializeSize = it.serializedSize
|
||||
log.Info("IndexNode IndexBuildTask saveIndexMeta indexMeta.state to IndexState_Finished",
|
||||
zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
|
||||
@ -547,6 +545,13 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute")
|
||||
defer sp.Finish()
|
||||
|
||||
state := it.GetState()
|
||||
if state != TaskStateNormal {
|
||||
log.Info("index task no need to execute", zap.Int64("buildID", it.req.IndexBuildID),
|
||||
zap.String("index state", it.GetState().String()))
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := it.prepareParams(ctx); err != nil {
|
||||
it.SetState(TaskStateFailed)
|
||||
log.Error("IndexNode IndexBuildTask Execute prepareParams failed",
|
||||
|
@ -23,6 +23,7 @@ const (
|
||||
TaskStateAbandon TaskState = 1
|
||||
TaskStateRetry TaskState = 2
|
||||
TaskStateFailed TaskState = 3
|
||||
TaskStateDeleted TaskState = 4
|
||||
)
|
||||
|
||||
var TaskStateNames = map[TaskState]string{
|
||||
|
Loading…
Reference in New Issue
Block a user