mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
IndexCoord does not assign task to IndexNode prematurely (#17717)
Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
50d5d04552
commit
1fbdafc943
@ -22,7 +22,6 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -118,7 +117,7 @@ func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord
|
||||
loopCancel: cancel,
|
||||
reqTimeoutInterval: time.Second * 10,
|
||||
durationInterval: time.Second * 10,
|
||||
assignTaskInterval: time.Second * 3,
|
||||
assignTaskInterval: time.Second * 1,
|
||||
taskLimit: 20,
|
||||
factory: factory,
|
||||
}
|
||||
@ -986,9 +985,7 @@ func (i *IndexCoord) assignTaskLoop() {
|
||||
continue
|
||||
}
|
||||
metas := i.metaTable.GetUnassignedTasks(serverIDs)
|
||||
sort.Slice(metas, func(i, j int) bool {
|
||||
return metas[i].indexMeta.Version <= metas[j].indexMeta.Version
|
||||
})
|
||||
|
||||
// only log if we find unassigned tasks
|
||||
if len(metas) != 0 {
|
||||
log.Debug("IndexCoord find unassigned tasks ", zap.Int("Unassigned tasks number", len(metas)), zap.Int64s("Available IndexNode IDs", serverIDs))
|
||||
@ -996,6 +993,18 @@ func (i *IndexCoord) assignTaskLoop() {
|
||||
for index, meta := range metas {
|
||||
indexBuildID := meta.indexMeta.IndexBuildID
|
||||
segID := meta.indexMeta.Req.SegmentID
|
||||
nodeID, builderClient := i.nodeManager.PeekClient(meta)
|
||||
if builderClient == nil && nodeID == -1 {
|
||||
log.Warn("there is no indexnode online")
|
||||
break
|
||||
}
|
||||
|
||||
if builderClient == nil && nodeID == 0 {
|
||||
log.Warn("The memory of all indexnodes does not meet the requirements")
|
||||
continue
|
||||
}
|
||||
log.Debug("IndexCoord PeekClient success", zap.Int64("nodeID", nodeID))
|
||||
|
||||
if err := i.tryAcquireSegmentReferLock(ctx, indexBuildID, []UniqueID{segID}); err != nil {
|
||||
log.Warn("IndexCoord try to acquire segment reference lock failed, maybe this segment has been compacted",
|
||||
zap.Int64("segID", segID), zap.Int64("buildID", indexBuildID), zap.Error(err))
|
||||
@ -1007,16 +1016,6 @@ func (i *IndexCoord) assignTaskLoop() {
|
||||
}
|
||||
log.Debug("The version of the task has been updated", zap.Int64("indexBuildID", indexBuildID))
|
||||
|
||||
nodeID, builderClient := i.nodeManager.PeekClient(meta)
|
||||
if builderClient == nil && nodeID == -1 {
|
||||
log.Warn("there is no indexnode online")
|
||||
break
|
||||
}
|
||||
if builderClient == nil && nodeID == 0 {
|
||||
log.Warn("The memory of all indexnodes does not meet the requirements")
|
||||
continue
|
||||
}
|
||||
log.Debug("IndexCoord PeekClient success", zap.Int64("nodeID", nodeID))
|
||||
req := &indexpb.CreateIndexRequest{
|
||||
IndexBuildID: indexBuildID,
|
||||
IndexName: meta.indexMeta.Req.IndexName,
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
@ -439,6 +440,16 @@ func (mt *metaTable) GetUnusedIndexFiles(limit int) []Meta {
|
||||
return metas
|
||||
}
|
||||
|
||||
func sortMetaPolicy(metas []Meta) []Meta {
|
||||
// the larger the segment, the higher the priority
|
||||
sort.Slice(metas, func(i, j int) bool {
|
||||
return metas[i].indexMeta.Version < metas[j].indexMeta.Version ||
|
||||
(metas[i].indexMeta.Version == metas[j].indexMeta.Version &&
|
||||
metas[i].indexMeta.Req.NumRows > metas[j].indexMeta.Req.NumRows)
|
||||
})
|
||||
return metas
|
||||
}
|
||||
|
||||
// GetUnassignedTasks get the unassigned tasks.
|
||||
func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta {
|
||||
mt.lock.RLock()
|
||||
@ -446,6 +457,9 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta {
|
||||
var metas []Meta
|
||||
|
||||
for _, meta := range mt.indexBuildID2Meta {
|
||||
if meta.indexMeta.MarkDeleted {
|
||||
continue
|
||||
}
|
||||
if meta.indexMeta.State == commonpb.IndexState_Unissued {
|
||||
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
|
||||
continue
|
||||
@ -465,7 +479,7 @@ func (mt *metaTable) GetUnassignedTasks(onlineNodeIDs []int64) []Meta {
|
||||
metas = append(metas, Meta{indexMeta: proto.Clone(meta.indexMeta).(*indexpb.IndexMeta), revision: meta.revision})
|
||||
}
|
||||
}
|
||||
return metas
|
||||
return sortMetaPolicy(metas)
|
||||
}
|
||||
|
||||
// HasSameReq determine whether there are same indexing tasks.
|
||||
|
@ -347,3 +347,84 @@ func TestMetaTable_Error(t *testing.T) {
|
||||
err = etcdKV.RemoveWithPrefix("indexes/")
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestMetaTable_GetUnassignedTasks(t *testing.T) {
|
||||
mt := metaTable{
|
||||
indexBuildID2Meta: map[UniqueID]Meta{
|
||||
1: {
|
||||
indexMeta: &indexpb.IndexMeta{
|
||||
IndexBuildID: 1,
|
||||
Req: &indexpb.BuildIndexRequest{
|
||||
NumRows: 10,
|
||||
},
|
||||
Version: 1,
|
||||
NodeID: 1,
|
||||
State: commonpb.IndexState_Unissued,
|
||||
},
|
||||
},
|
||||
2: {
|
||||
indexMeta: &indexpb.IndexMeta{
|
||||
IndexBuildID: 2,
|
||||
Req: &indexpb.BuildIndexRequest{
|
||||
NumRows: 100,
|
||||
},
|
||||
Version: 1,
|
||||
NodeID: 1,
|
||||
State: commonpb.IndexState_Unissued,
|
||||
},
|
||||
},
|
||||
3: {
|
||||
indexMeta: &indexpb.IndexMeta{
|
||||
IndexBuildID: 3,
|
||||
Req: &indexpb.BuildIndexRequest{
|
||||
NumRows: 1000,
|
||||
},
|
||||
Version: 2,
|
||||
NodeID: 1,
|
||||
State: commonpb.IndexState_Unissued,
|
||||
},
|
||||
},
|
||||
4: {
|
||||
indexMeta: &indexpb.IndexMeta{
|
||||
IndexBuildID: 4,
|
||||
Req: &indexpb.BuildIndexRequest{
|
||||
NumRows: 1000,
|
||||
},
|
||||
Version: 2,
|
||||
NodeID: 2,
|
||||
State: commonpb.IndexState_Finished,
|
||||
},
|
||||
},
|
||||
5: {
|
||||
indexMeta: &indexpb.IndexMeta{
|
||||
IndexBuildID: 5,
|
||||
Req: &indexpb.BuildIndexRequest{
|
||||
NumRows: 1000,
|
||||
},
|
||||
Version: 3,
|
||||
NodeID: 2,
|
||||
State: commonpb.IndexState_InProgress,
|
||||
},
|
||||
},
|
||||
6: {
|
||||
indexMeta: &indexpb.IndexMeta{
|
||||
IndexBuildID: 5,
|
||||
Req: &indexpb.BuildIndexRequest{
|
||||
NumRows: 1000,
|
||||
},
|
||||
Version: 1,
|
||||
NodeID: 1,
|
||||
State: commonpb.IndexState_InProgress,
|
||||
MarkDeleted: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
metas := mt.GetUnassignedTasks([]UniqueID{1, 3, 4})
|
||||
assert.Equal(t, 4, len(metas))
|
||||
assert.Equal(t, int64(2), metas[0].indexMeta.IndexBuildID)
|
||||
assert.Equal(t, int64(1), metas[1].indexMeta.IndexBuildID)
|
||||
assert.Equal(t, int64(3), metas[2].indexMeta.IndexBuildID)
|
||||
assert.Equal(t, int64(5), metas[3].indexMeta.IndexBuildID)
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ func PeekClientV0(memorySize uint64, indexParams []*commonpb.KeyValuePair,
|
||||
func PeekClientV1(memorySize uint64, indexParams []*commonpb.KeyValuePair,
|
||||
typeParams []*commonpb.KeyValuePair, pq *PriorityQueue) UniqueID {
|
||||
for i := range pq.items {
|
||||
if pq.items[i].totalMem > memorySize {
|
||||
if pq.items[i].totalMem > memorySize && pq.items[i].priority < 2 {
|
||||
return pq.items[i].key
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ func newPriorityQueue() *PriorityQueue {
|
||||
for i := 1; i <= QueueLen; i++ {
|
||||
item := &PQItem{
|
||||
key: UniqueID(i),
|
||||
priority: i,
|
||||
priority: i - 1,
|
||||
index: i - 1,
|
||||
totalMem: 1000,
|
||||
}
|
||||
@ -99,7 +99,7 @@ func TestPriorityQueue_SetMemory(t *testing.T) {
|
||||
for i := 0; i < QueueLen; i++ {
|
||||
item := &PQItem{
|
||||
key: UniqueID(i),
|
||||
priority: i,
|
||||
priority: 0,
|
||||
index: i,
|
||||
totalMem: 1000,
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user