mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Fix auto balance block channel reassign after datanode restart (#28275)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
bce1054f92
commit
14c8a90517
@ -347,11 +347,19 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
|
||||
|
||||
c.store.Add(nodeID)
|
||||
|
||||
if !Params.DataCoordCfg.AutoBalance.GetAsBool() {
|
||||
return nil
|
||||
bufferedUpdates, balanceUpdates := c.registerPolicy(c.store, nodeID)
|
||||
|
||||
updates := bufferedUpdates
|
||||
// try bufferedUpdates first
|
||||
if len(updates) <= 0 {
|
||||
if !Params.DataCoordCfg.AutoBalance.GetAsBool() {
|
||||
log.Info("auto balance disabled, skip reassignment for balance", zap.Int64("registered node", nodeID))
|
||||
return nil
|
||||
}
|
||||
// if auto balance enabled, try balanceUpdates
|
||||
updates = balanceUpdates
|
||||
}
|
||||
|
||||
updates := c.registerPolicy(c.store, nodeID)
|
||||
if len(updates) <= 0 {
|
||||
log.Info("register node with no reassignment", zap.Int64("registered node", nodeID))
|
||||
return nil
|
||||
|
@ -31,11 +31,12 @@ import (
|
||||
)
|
||||
|
||||
// RegisterPolicy decides the channels mapping after registering the nodeID
|
||||
type RegisterPolicy func(store ROChannelStore, nodeID int64) ChannelOpSet
|
||||
// return bufferedUpdates and balanceUpdates
|
||||
type RegisterPolicy func(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet)
|
||||
|
||||
// EmptyRegister does nothing
|
||||
func EmptyRegister(store ROChannelStore, nodeID int64) ChannelOpSet {
|
||||
return nil
|
||||
func EmptyRegister(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// BufferChannelAssignPolicy assigns buffer channels to new registered node
|
||||
@ -53,10 +54,10 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) ChannelOpSet
|
||||
|
||||
// AvgAssignRegisterPolicy assigns channels with average to new registered node
|
||||
// Register will not directly delete the node-channel pair. Channel manager will handle channel release.
|
||||
func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet {
|
||||
func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) {
|
||||
opSet := BufferChannelAssignPolicy(store, nodeID)
|
||||
if len(opSet) != 0 {
|
||||
return opSet
|
||||
return opSet, nil
|
||||
}
|
||||
|
||||
// Get a list of available node-channel info.
|
||||
@ -69,7 +70,7 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet {
|
||||
// store already add the new node
|
||||
chPerNode := channelNum / len(store.GetNodes())
|
||||
if chPerNode == 0 {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// sort in descending order and reallocate
|
||||
@ -96,7 +97,7 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet {
|
||||
for k, v := range releases {
|
||||
opSet.Add(k, v)
|
||||
}
|
||||
return opSet
|
||||
return nil, opSet
|
||||
}
|
||||
|
||||
// filterNode filters out node-channel info where node ID == `nodeID`.
|
||||
@ -113,7 +114,7 @@ func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo {
|
||||
|
||||
// ConsistentHashRegisterPolicy use a consistent hash to maintain the mapping
|
||||
func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolicy {
|
||||
return func(store ROChannelStore, nodeID int64) ChannelOpSet {
|
||||
return func(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) {
|
||||
elems := formatNodeIDs(store.GetNodes())
|
||||
hashRing.Set(elems)
|
||||
|
||||
@ -122,7 +123,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic
|
||||
// If there are buffer channels, then nodeID is the first node.
|
||||
opSet := BufferChannelAssignPolicy(store, nodeID)
|
||||
if len(opSet) != 0 {
|
||||
return opSet
|
||||
return opSet, nil
|
||||
}
|
||||
|
||||
opSet = ChannelOpSet{}
|
||||
@ -135,12 +136,12 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic
|
||||
if err != nil {
|
||||
log.Warn("receive error when getting from hashRing",
|
||||
zap.String("channel", ch.Name), zap.Error(err))
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
did, err := deformatNodeID(idStr)
|
||||
if err != nil {
|
||||
log.Warn("failed to deformat node id", zap.Int64("nodeID", did))
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
if did != c.NodeID {
|
||||
releases[c.NodeID] = append(releases[c.NodeID], ch)
|
||||
@ -152,7 +153,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic
|
||||
for id, channels := range releases {
|
||||
opSet.Add(id, channels)
|
||||
}
|
||||
return opSet
|
||||
return nil, opSet
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ func TestConsistentHashRegisterPolicy(t *testing.T) {
|
||||
hashring := consistent.New()
|
||||
policy := ConsistentHashRegisterPolicy(hashring)
|
||||
|
||||
updates := policy(store, 1)
|
||||
updates, _ := policy(store, 1)
|
||||
assert.NotNil(t, updates)
|
||||
assert.Equal(t, 2, len(updates))
|
||||
assert.EqualValues(t, &ChannelOp{Type: Delete, NodeID: bufferID, Channels: channels}, updates[0])
|
||||
@ -93,7 +93,7 @@ func TestConsistentHashRegisterPolicy(t *testing.T) {
|
||||
hashring.Add(formatNodeID(1))
|
||||
policy := ConsistentHashRegisterPolicy(hashring)
|
||||
|
||||
updates := policy(store, 2)
|
||||
_, updates := policy(store, 2)
|
||||
|
||||
assert.NotNil(t, updates)
|
||||
assert.Equal(t, 1, len(updates))
|
||||
@ -671,9 +671,10 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
|
||||
nodeID int64
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want ChannelOpSet
|
||||
name string
|
||||
args args
|
||||
bufferedUpdates ChannelOpSet
|
||||
balanceUpdates ChannelOpSet
|
||||
}{
|
||||
{
|
||||
"test empty",
|
||||
@ -687,6 +688,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
|
||||
1,
|
||||
},
|
||||
nil,
|
||||
nil,
|
||||
},
|
||||
{
|
||||
"test with buffer channel",
|
||||
@ -712,6 +714,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
|
||||
Channels: []*channel{{Name: "ch1", CollectionID: 1}},
|
||||
},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
"test with avg assign",
|
||||
@ -725,6 +728,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
|
||||
},
|
||||
3,
|
||||
},
|
||||
nil,
|
||||
[]*ChannelOp{
|
||||
{
|
||||
Type: Add,
|
||||
@ -747,6 +751,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
|
||||
3,
|
||||
},
|
||||
nil,
|
||||
nil,
|
||||
},
|
||||
{
|
||||
"test node with empty channel",
|
||||
@ -761,6 +766,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
|
||||
},
|
||||
3,
|
||||
},
|
||||
nil,
|
||||
[]*ChannelOp{
|
||||
{
|
||||
Type: Add,
|
||||
@ -772,8 +778,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := AvgAssignRegisterPolicy(tt.args.store, tt.args.nodeID)
|
||||
assert.EqualValues(t, tt.want, got)
|
||||
bufferedUpdates, balanceUpdates := AvgAssignRegisterPolicy(tt.args.store, tt.args.nodeID)
|
||||
assert.EqualValues(t, tt.bufferedUpdates, bufferedUpdates)
|
||||
assert.EqualValues(t, tt.balanceUpdates, balanceUpdates)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user