From 14c8a90517ad872d912673f56f613f6cffe73260 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 9 Nov 2023 19:00:25 +0800 Subject: [PATCH] Fix auto balance block channel reassign after datanode restart (#28275) Signed-off-by: Wei Liu --- internal/datacoord/channel_manager.go | 14 +++++++++++--- internal/datacoord/policy.go | 25 +++++++++++++------------ internal/datacoord/policy_test.go | 21 ++++++++++++++------- 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 66605ed030..1a3832f59b 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -347,11 +347,19 @@ func (c *ChannelManager) AddNode(nodeID int64) error { c.store.Add(nodeID) - if !Params.DataCoordCfg.AutoBalance.GetAsBool() { - return nil + bufferedUpdates, balanceUpdates := c.registerPolicy(c.store, nodeID) + + updates := bufferedUpdates + // try bufferedUpdates first + if len(updates) <= 0 { + if !Params.DataCoordCfg.AutoBalance.GetAsBool() { + log.Info("auto balance disabled, skip reassignment for balance", zap.Int64("registered node", nodeID)) + return nil + } + // if auto balance enabled, try balanceUpdates + updates = balanceUpdates } - updates := c.registerPolicy(c.store, nodeID) if len(updates) <= 0 { log.Info("register node with no reassignment", zap.Int64("registered node", nodeID)) return nil diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index e675821c14..a0f13b1912 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -31,11 +31,12 @@ import ( ) // RegisterPolicy decides the channels mapping after registering the nodeID -type RegisterPolicy func(store ROChannelStore, nodeID int64) ChannelOpSet +// return bufferedUpdates and balanceUpdates +type RegisterPolicy func(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) // EmptyRegister does nothing -func EmptyRegister(store ROChannelStore, nodeID int64) ChannelOpSet { - return nil +func EmptyRegister(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) { + return nil, nil } // BufferChannelAssignPolicy assigns buffer channels to new registered node @@ -53,10 +54,10 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) ChannelOpSet // AvgAssignRegisterPolicy assigns channels with average to new registered node // Register will not directly delete the node-channel pair. Channel manager will handle channel release. -func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { +func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) { opSet := BufferChannelAssignPolicy(store, nodeID) if len(opSet) != 0 { - return opSet + return opSet, nil } // Get a list of available node-channel info. @@ -69,7 +70,7 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { // store already add the new node chPerNode := channelNum / len(store.GetNodes()) if chPerNode == 0 { - return nil + return nil, nil } // sort in descending order and reallocate @@ -96,7 +97,7 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { for k, v := range releases { opSet.Add(k, v) } - return opSet + return nil, opSet } // filterNode filters out node-channel info where node ID == `nodeID`. @@ -113,7 +114,7 @@ func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo { // ConsistentHashRegisterPolicy use a consistent hash to maintain the mapping func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolicy { - return func(store ROChannelStore, nodeID int64) ChannelOpSet { + return func(store ROChannelStore, nodeID int64) (ChannelOpSet, ChannelOpSet) { elems := formatNodeIDs(store.GetNodes()) hashRing.Set(elems) @@ -122,7 +123,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic // If there are buffer channels, then nodeID is the first node. opSet := BufferChannelAssignPolicy(store, nodeID) if len(opSet) != 0 { - return opSet + return opSet, nil } opSet = ChannelOpSet{} @@ -135,12 +136,12 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic if err != nil { log.Warn("receive error when getting from hashRing", zap.String("channel", ch.Name), zap.Error(err)) - return nil + return nil, nil } did, err := deformatNodeID(idStr) if err != nil { log.Warn("failed to deformat node id", zap.Int64("nodeID", did)) - return nil + return nil, nil } if did != c.NodeID { releases[c.NodeID] = append(releases[c.NodeID], ch) @@ -152,7 +153,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic for id, channels := range releases { opSet.Add(id, channels) } - return opSet + return nil, opSet } } diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index 17db93a16d..2e075e84a9 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -69,7 +69,7 @@ func TestConsistentHashRegisterPolicy(t *testing.T) { hashring := consistent.New() policy := ConsistentHashRegisterPolicy(hashring) - updates := policy(store, 1) + updates, _ := policy(store, 1) assert.NotNil(t, updates) assert.Equal(t, 2, len(updates)) assert.EqualValues(t, &ChannelOp{Type: Delete, NodeID: bufferID, Channels: channels}, updates[0]) @@ -93,7 +93,7 @@ func TestConsistentHashRegisterPolicy(t *testing.T) { hashring.Add(formatNodeID(1)) policy := ConsistentHashRegisterPolicy(hashring) - updates := policy(store, 2) + _, updates := policy(store, 2) assert.NotNil(t, updates) assert.Equal(t, 1, len(updates)) @@ -671,9 +671,10 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { nodeID int64 } tests := []struct { - name string - args args - want ChannelOpSet + name string + args args + bufferedUpdates ChannelOpSet + balanceUpdates ChannelOpSet }{ { "test empty", @@ -687,6 +688,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { 1, }, nil, + nil, }, { "test with buffer channel", @@ -712,6 +714,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { Channels: []*channel{{Name: "ch1", CollectionID: 1}}, }, }, + nil, }, { "test with avg assign", @@ -725,6 +728,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { }, 3, }, + nil, []*ChannelOp{ { Type: Add, @@ -747,6 +751,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { 3, }, nil, + nil, }, { "test node with empty channel", @@ -761,6 +766,7 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { }, 3, }, + nil, []*ChannelOp{ { Type: Add, @@ -772,8 +778,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := AvgAssignRegisterPolicy(tt.args.store, tt.args.nodeID) - assert.EqualValues(t, tt.want, got) + bufferedUpdates, balanceUpdates := AvgAssignRegisterPolicy(tt.args.store, tt.args.nodeID) + assert.EqualValues(t, tt.bufferedUpdates, bufferedUpdates) + assert.EqualValues(t, tt.balanceUpdates, balanceUpdates) }) } }