mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 04:19:18 +08:00
Fix on busy node list not updated when services restart (#19561)
issue: #19544 /kind bug Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com> Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
parent
1919353f02
commit
d827391af1
@ -66,7 +66,7 @@ var flipTaskStateInterval = 15 * 1000
|
|||||||
type importManager struct {
|
type importManager struct {
|
||||||
ctx context.Context // reserved
|
ctx context.Context // reserved
|
||||||
taskStore kv.MetaKv // Persistent task info storage.
|
taskStore kv.MetaKv // Persistent task info storage.
|
||||||
busyNodes map[int64]bool // Set of all current working DataNodes.
|
busyNodes map[int64]int64 // Set of all current working DataNode IDs and related task create timestamp.
|
||||||
|
|
||||||
// TODO: Make pendingTask a map to improve look up performance.
|
// TODO: Make pendingTask a map to improve look up performance.
|
||||||
pendingTasks []*datapb.ImportTaskInfo // pending tasks
|
pendingTasks []*datapb.ImportTaskInfo // pending tasks
|
||||||
@ -101,7 +101,7 @@ func newImportManager(ctx context.Context, client kv.MetaKv,
|
|||||||
taskStore: client,
|
taskStore: client,
|
||||||
pendingTasks: make([]*datapb.ImportTaskInfo, 0, MaxPendingCount), // currently task queue max size is 32
|
pendingTasks: make([]*datapb.ImportTaskInfo, 0, MaxPendingCount), // currently task queue max size is 32
|
||||||
workingTasks: make(map[int64]*datapb.ImportTaskInfo),
|
workingTasks: make(map[int64]*datapb.ImportTaskInfo),
|
||||||
busyNodes: make(map[int64]bool),
|
busyNodes: make(map[int64]int64),
|
||||||
pendingLock: sync.RWMutex{},
|
pendingLock: sync.RWMutex{},
|
||||||
workingLock: sync.RWMutex{},
|
workingLock: sync.RWMutex{},
|
||||||
busyNodesLock: sync.RWMutex{},
|
busyNodesLock: sync.RWMutex{},
|
||||||
@ -189,6 +189,8 @@ func (m *importManager) cleanupLoop(wg *sync.WaitGroup) {
|
|||||||
m.expireOldTasksFromEtcd()
|
m.expireOldTasksFromEtcd()
|
||||||
log.Debug("(in cleanupLoop) start removing bad import segments")
|
log.Debug("(in cleanupLoop) start removing bad import segments")
|
||||||
m.removeBadImportSegments(m.ctx)
|
m.removeBadImportSegments(m.ctx)
|
||||||
|
log.Debug("(in cleanupLoop) start cleaning hanging busy DataNode")
|
||||||
|
m.releaseHangingBusyDataNode()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -249,7 +251,7 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
|
|||||||
zap.Int64("task ID", it.GetTaskId()),
|
zap.Int64("task ID", it.GetTaskId()),
|
||||||
zap.Int64("dataNode ID", task.GetDatanodeId()))
|
zap.Int64("dataNode ID", task.GetDatanodeId()))
|
||||||
// Add new working dataNode to busyNodes.
|
// Add new working dataNode to busyNodes.
|
||||||
m.busyNodes[resp.GetDatanodeId()] = true
|
m.busyNodes[resp.GetDatanodeId()] = task.GetCreateTs()
|
||||||
err = func() error {
|
err = func() error {
|
||||||
m.workingLock.Lock()
|
m.workingLock.Lock()
|
||||||
defer m.workingLock.Unlock()
|
defer m.workingLock.Unlock()
|
||||||
@ -645,6 +647,7 @@ func (m *importManager) setImportTaskStateAndReason(taskID int64, targetState co
|
|||||||
log.Error("failed to unmarshal proto", zap.String("taskInfo", v), zap.Error(err))
|
log.Error("failed to unmarshal proto", zap.String("taskInfo", v), zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
toPersistImportTaskInfo := cloneImportTaskInfo(ti)
|
toPersistImportTaskInfo := cloneImportTaskInfo(ti)
|
||||||
|
toPersistImportTaskInfo.State.StateCode = targetState
|
||||||
tryUpdateErrMsg(errReason, toPersistImportTaskInfo)
|
tryUpdateErrMsg(errReason, toPersistImportTaskInfo)
|
||||||
// Update task in task store.
|
// Update task in task store.
|
||||||
if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil {
|
if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil {
|
||||||
@ -890,6 +893,10 @@ func (m *importManager) expireOldTasksFromMem() {
|
|||||||
log.Info("a working task has expired", zap.Int64("task ID", v.GetId()))
|
log.Info("a working task has expired", zap.Int64("task ID", v.GetId()))
|
||||||
taskID := v.GetId()
|
taskID := v.GetId()
|
||||||
m.workingLock.Unlock()
|
m.workingLock.Unlock()
|
||||||
|
// Remove DataNode from busy node list, so it can serve other tasks again.
|
||||||
|
m.busyNodesLock.Lock()
|
||||||
|
delete(m.busyNodes, v.GetDatanodeId())
|
||||||
|
m.busyNodesLock.Unlock()
|
||||||
if err := m.setImportTaskStateAndReason(taskID, commonpb.ImportState_ImportFailed,
|
if err := m.setImportTaskStateAndReason(taskID, commonpb.ImportState_ImportFailed,
|
||||||
"the import task has timed out"); err != nil {
|
"the import task has timed out"); err != nil {
|
||||||
log.Error("failed to set import task state",
|
log.Error("failed to set import task state",
|
||||||
@ -937,6 +944,24 @@ func (m *importManager) expireOldTasksFromEtcd() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// releaseHangingBusyDataNode checks if a busy DataNode has been 'busy' for an unexpected long time.
|
||||||
|
// We will then remove these DataNodes from `busy list`.
|
||||||
|
func (m *importManager) releaseHangingBusyDataNode() {
|
||||||
|
m.busyNodesLock.Lock()
|
||||||
|
for nodeID, ts := range m.busyNodes {
|
||||||
|
log.Info("busy DataNode found",
|
||||||
|
zap.Int64("node ID", nodeID),
|
||||||
|
zap.Int64("busy duration (seconds)", time.Now().Unix()-ts),
|
||||||
|
)
|
||||||
|
if Params.RootCoordCfg.ImportTaskExpiration <= float64(time.Now().Unix()-ts) {
|
||||||
|
log.Warn("release a hanging busy DataNode",
|
||||||
|
zap.Int64("node ID", nodeID))
|
||||||
|
delete(m.busyNodes, nodeID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.busyNodesLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func rearrangeTasks(tasks []*milvuspb.GetImportStateResponse) {
|
func rearrangeTasks(tasks []*milvuspb.GetImportStateResponse) {
|
||||||
sort.Slice(tasks, func(i, j int) bool {
|
sort.Slice(tasks, func(i, j int) bool {
|
||||||
return tasks[i].GetId() < tasks[j].GetId()
|
return tasks[i].GetId() < tasks[j].GetId()
|
||||||
|
@ -239,6 +239,76 @@ func TestImportManager_NewImportManager(t *testing.T) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestImportManager_TestSetImportTaskState(t *testing.T) {
|
||||||
|
var countLock sync.RWMutex
|
||||||
|
var globalCount = typeutil.UniqueID(0)
|
||||||
|
|
||||||
|
var idAlloc = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
|
||||||
|
countLock.Lock()
|
||||||
|
defer countLock.Unlock()
|
||||||
|
globalCount++
|
||||||
|
return globalCount, 0, nil
|
||||||
|
}
|
||||||
|
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
|
||||||
|
Params.RootCoordCfg.ImportTaskExpiration = 50
|
||||||
|
Params.RootCoordCfg.ImportTaskRetention = 200
|
||||||
|
checkPendingTasksInterval = 100
|
||||||
|
cleanUpLoopInterval = 100
|
||||||
|
mockKv := &kv.MockMetaKV{}
|
||||||
|
mockKv.InMemKv = sync.Map{}
|
||||||
|
ti1 := &datapb.ImportTaskInfo{
|
||||||
|
Id: 100,
|
||||||
|
State: &datapb.ImportTaskState{
|
||||||
|
StateCode: commonpb.ImportState_ImportPending,
|
||||||
|
},
|
||||||
|
CreateTs: time.Now().Unix() - 100,
|
||||||
|
}
|
||||||
|
ti2 := &datapb.ImportTaskInfo{
|
||||||
|
Id: 200,
|
||||||
|
State: &datapb.ImportTaskState{
|
||||||
|
StateCode: commonpb.ImportState_ImportPersisted,
|
||||||
|
},
|
||||||
|
CreateTs: time.Now().Unix() - 100,
|
||||||
|
}
|
||||||
|
taskInfo1, err := proto.Marshal(ti1)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
taskInfo2, err := proto.Marshal(ti2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
mockKv.Save(BuildImportTaskKey(1), "value")
|
||||||
|
mockKv.Save(BuildImportTaskKey(100), string(taskInfo1))
|
||||||
|
mockKv.Save(BuildImportTaskKey(200), string(taskInfo2))
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
t.Run("working task expired", func(t *testing.T) {
|
||||||
|
defer wg.Done()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil, nil)
|
||||||
|
assert.NotNil(t, mgr)
|
||||||
|
_, err := mgr.loadFromTaskStore(true)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
// Task not exist.
|
||||||
|
assert.Error(t, mgr.setImportTaskState(999, commonpb.ImportState_ImportStarted))
|
||||||
|
// Normal case: update in-mem task state.
|
||||||
|
assert.NoError(t, mgr.setImportTaskState(100, commonpb.ImportState_ImportPersisted))
|
||||||
|
v, err := mockKv.Load(BuildImportTaskKey(100))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
ti := &datapb.ImportTaskInfo{}
|
||||||
|
err = proto.Unmarshal([]byte(v), ti)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, ti.GetState().GetStateCode(), commonpb.ImportState_ImportPersisted)
|
||||||
|
// Normal case: update Etcd task state.
|
||||||
|
assert.NoError(t, mgr.setImportTaskState(200, commonpb.ImportState_ImportFailedAndCleaned))
|
||||||
|
v, err = mockKv.Load(BuildImportTaskKey(200))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
ti = &datapb.ImportTaskInfo{}
|
||||||
|
err = proto.Unmarshal([]byte(v), ti)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, ti.GetState().GetStateCode(), commonpb.ImportState_ImportFailedAndCleaned)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestImportManager_TestEtcdCleanUp(t *testing.T) {
|
func TestImportManager_TestEtcdCleanUp(t *testing.T) {
|
||||||
var countLock sync.RWMutex
|
var countLock sync.RWMutex
|
||||||
var globalCount = typeutil.UniqueID(0)
|
var globalCount = typeutil.UniqueID(0)
|
||||||
@ -319,6 +389,8 @@ func TestImportManager_TestEtcdCleanUp(t *testing.T) {
|
|||||||
keys, _, _ := mockKv.LoadWithPrefix("")
|
keys, _, _ := mockKv.LoadWithPrefix("")
|
||||||
// All 3 tasks are stored in Etcd.
|
// All 3 tasks are stored in Etcd.
|
||||||
assert.Equal(t, 3, len(keys))
|
assert.Equal(t, 3, len(keys))
|
||||||
|
mgr.busyNodes[20] = time.Now().Unix() - 20*60
|
||||||
|
mgr.busyNodes[30] = time.Now().Unix()
|
||||||
mgr.cleanupLoop(&wgLoop)
|
mgr.cleanupLoop(&wgLoop)
|
||||||
keys, _, _ = mockKv.LoadWithPrefix("")
|
keys, _, _ = mockKv.LoadWithPrefix("")
|
||||||
// task 1 and task 2 have passed retention period.
|
// task 1 and task 2 have passed retention period.
|
||||||
|
Loading…
Reference in New Issue
Block a user