Fix bug for indexnode load balancing (#10994)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2021-11-01 17:16:05 +08:00 committed by GitHub
parent 2a55339ae5
commit 5e077fd261
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 1 deletions

View File

@ -54,6 +54,7 @@ func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) {
item := &PQItem{
key: nodeID,
priority: 0,
weight: 0,
}
nm.nodeClients[nodeID] = client
nm.pq.Push(item)

View File

@ -27,6 +27,8 @@ type PQItem struct {
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
weight int // The weight of the item in the queue.
// When the priority is the same, a smaller weight is more preferred.
index int // The index of the item in the heap.
}
@ -45,7 +47,8 @@ func (pq *PriorityQueue) Len() int {
// must sort before the element with index j.
func (pq *PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
return pq.items[i].priority < pq.items[j].priority
return (pq.items[i].priority < pq.items[j].priority) ||
(pq.items[i].priority == pq.items[j].priority && pq.items[i].weight < pq.items[j].weight)
}
// Swap swaps the elements with indexes i and j.
@ -106,6 +109,9 @@ func (pq *PriorityQueue) IncPriority(key UniqueID, priority int) {
item := pq.getItemByKey(key)
if item != nil {
item.(*PQItem).priority += priority
if priority > 0 {
item.(*PQItem).weight += priority
}
heap.Fix(pq, item.(*PQItem).index)
}
}
@ -117,6 +123,7 @@ func (pq *PriorityQueue) UpdatePriority(key UniqueID, priority int) {
item := pq.getItemByKey(key)
if item != nil {
item.(*PQItem).priority = priority
item.(*PQItem).weight = priority
heap.Fix(pq, item.(*PQItem).index)
}
}

View File

@ -87,3 +87,34 @@ func TestPriorityQueue_IncPriority(t *testing.T) {
peekKey := pq.Peek()
assert.Equal(t, key, peekKey)
}
func TestPriorityQueue(t *testing.T) {
ret := &PriorityQueue{}
for i := 0; i < 4; i++ {
item := &PQItem{
key: UniqueID(i),
priority: 0,
index: i,
}
ret.items = append(ret.items, item)
}
heap.Init(ret)
peeKey1 := ret.Peek()
assert.Equal(t, int64(0), peeKey1)
ret.IncPriority(peeKey1, 1)
peeKey2 := ret.Peek()
assert.Equal(t, int64(1), peeKey2)
ret.IncPriority(peeKey2, 1)
ret.IncPriority(peeKey1, -1)
ret.IncPriority(peeKey2, -1)
peeKey1 = ret.Peek()
assert.Equal(t, int64(3), peeKey1)
ret.IncPriority(peeKey1, 1)
peeKey2 = ret.Peek()
assert.Equal(t, int64(2), peeKey2)
}