enhance: Tidy dc channel manager (#34515)

See also: #34518

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-07-09 18:26:12 +08:00 committed by GitHub
parent c1e04534c3
commit 314f4d995b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 55 additions and 779 deletions

View File

@ -71,6 +71,7 @@ type ChannelManagerImpl struct {
factory ChannelPolicyFactory
balancePolicy BalanceChannelPolicy
assignPolicy AssignPolicy
balanceCheckLoop ChannelBGChecker
@ -93,7 +94,7 @@ func withCheckerV2() ChannelmanagerOpt {
return func(c *ChannelManagerImpl) { c.balanceCheckLoop = c.CheckLoop }
}
func NewChannelManagerV2(
func NewChannelManager(
kv kv.TxnKV,
h Handler,
subCluster SubCluster, // sessionManager
@ -117,6 +118,7 @@ func NewChannelManagerV2(
}
m.balancePolicy = m.factory.NewBalancePolicy()
m.assignPolicy = m.factory.NewAssignPolicy()
m.lastActiveTimestamp = time.Now()
return m, nil
}
@ -189,7 +191,7 @@ func (m *ChannelManagerImpl) AddNode(nodeID UniqueID) error {
log.Info("register node", zap.Int64("registered node", nodeID))
m.store.AddNode(nodeID)
updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
updates := m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
if updates == nil {
log.Info("register node with no reassignment", zap.Int64("registered node", nodeID))
@ -242,7 +244,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
// channel already written into meta, try to assign it to the cluster
// not error is returned if failed, the assignment will retry later
updates = AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
updates = m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
if updates == nil {
return nil
}
@ -270,7 +272,7 @@ func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
}
updates := NewChannelOpSet(
NewDeleteOp(info.NodeID, lo.Values(info.Channels)...),
NewChannelOp(info.NodeID, Delete, lo.Values(info.Channels)...),
NewChannelOp(bufferID, Watch, lo.Values(info.Channels)...),
)
log.Info("deregister node", zap.Int64("nodeID", nodeID), zap.Array("updates", updates))
@ -292,7 +294,7 @@ func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
m.mu.Lock()
defer m.mu.Unlock()
updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
updates := m.assignPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
if updates != nil {
return m.execute(updates)
}

View File

@ -18,16 +18,10 @@ package datacoord
// ChannelPolicyFactory is the abstract factory that creates policies for channel manager.
type ChannelPolicyFactory interface {
// NewRegisterPolicy creates a new register policy.
NewRegisterPolicy() RegisterPolicy
// NewDeregisterPolicy creates a new deregister policy.
NewDeregisterPolicy() DeregisterPolicy
// NewAssignPolicy creates a new channel assign policy.
NewAssignPolicy() ChannelAssignPolicy
// NewReassignPolicy creates a new channel reassign policy.
NewReassignPolicy() ChannelReassignPolicy
// NewBalancePolicy creates a new channel balance policy.
NewBalancePolicy() BalanceChannelPolicy
NewAssignPolicy() AssignPolicy
}
// ChannelPolicyFactoryV1 equal to policy batch
@ -38,26 +32,10 @@ func NewChannelPolicyFactoryV1() *ChannelPolicyFactoryV1 {
return &ChannelPolicyFactoryV1{}
}
// NewRegisterPolicy implementing ChannelPolicyFactory returns BufferChannelAssignPolicy.
func (f *ChannelPolicyFactoryV1) NewRegisterPolicy() RegisterPolicy {
return AvgAssignRegisterPolicy
}
// NewDeregisterPolicy implementing ChannelPolicyFactory returns AvgAssignUnregisteredChannels.
func (f *ChannelPolicyFactoryV1) NewDeregisterPolicy() DeregisterPolicy {
return AvgAssignUnregisteredChannels
}
// NewAssignPolicy implementing ChannelPolicyFactory returns AverageAssignPolicy.
func (f *ChannelPolicyFactoryV1) NewAssignPolicy() ChannelAssignPolicy {
return AverageAssignPolicy
}
// NewReassignPolicy implementing ChannelPolicyFactory returns AverageReassignPolicy.
func (f *ChannelPolicyFactoryV1) NewReassignPolicy() ChannelReassignPolicy {
return AverageReassignPolicy
}
func (f *ChannelPolicyFactoryV1) NewBalancePolicy() BalanceChannelPolicy {
return AvgBalanceChannelPolicy
}
func (f *ChannelPolicyFactoryV1) NewAssignPolicy() AssignPolicy {
return AvgAssignByCountPolicy
}

View File

@ -116,7 +116,7 @@ func (s *ChannelManagerSuite) TearDownTest() {}
func (s *ChannelManagerSuite) TestAddNode() {
s.Run("AddNode with empty store", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
var testNode int64 = 1
@ -134,7 +134,7 @@ func (s *ChannelManagerSuite) TestAddNode() {
"ch2": bufferID,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
var (
@ -162,7 +162,7 @@ func (s *ChannelManagerSuite) TestAddNode() {
chNodes := map[string]int64{testChannel: storedNodeID}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, storedNodeID, testChannel, Watched)
@ -189,7 +189,7 @@ func (s *ChannelManagerSuite) TestAddNode() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
var testNodeID int64 = 100
@ -205,7 +205,7 @@ func (s *ChannelManagerSuite) TestAddNode() {
func (s *ChannelManagerSuite) TestWatch() {
s.Run("test Watch with empty store", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
var testCh string = "ch1"
@ -217,7 +217,7 @@ func (s *ChannelManagerSuite) TestWatch() {
})
s.Run("test Watch with nodeID in store", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
var (
@ -238,7 +238,7 @@ func (s *ChannelManagerSuite) TestWatch() {
func (s *ChannelManagerSuite) TestRelease() {
s.Run("release not exist nodeID and channel", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
err = m.Release(1, "ch1")
@ -253,7 +253,7 @@ func (s *ChannelManagerSuite) TestRelease() {
s.Run("release channel in bufferID", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
m.Watch(context.TODO(), getChannel("ch1", 1))
@ -268,7 +268,7 @@ func (s *ChannelManagerSuite) TestRelease() {
func (s *ChannelManagerSuite) TestDeleteNode() {
s.Run("delete not exsit node", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
info := m.store.GetNode(1)
s.Require().Nil(info)
@ -278,7 +278,7 @@ func (s *ChannelManagerSuite) TestDeleteNode() {
})
s.Run("delete bufferID", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
info := m.store.GetNode(bufferID)
s.Require().NotNil(info)
@ -289,7 +289,7 @@ func (s *ChannelManagerSuite) TestDeleteNode() {
s.Run("delete node without assigment", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
err = m.AddNode(1)
@ -309,7 +309,7 @@ func (s *ChannelManagerSuite) TestDeleteNode() {
"ch3": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", Watched)
s.checkAssignment(m, 1, "ch2", Watched)
@ -342,7 +342,7 @@ func (s *ChannelManagerSuite) TestFindWatcher() {
"ch4": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
tests := []struct {
@ -382,7 +382,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, bufferID, "ch1", Standby)
s.checkAssignment(m, bufferID, "ch2", Standby)
@ -400,7 +400,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Times(2)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, bufferID, "ch1", Standby)
s.checkAssignment(m, bufferID, "ch2", Standby)
@ -417,7 +417,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
@ -433,7 +433,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
@ -455,7 +455,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
@ -478,7 +478,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
@ -500,7 +500,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(2)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
@ -527,7 +527,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
@ -549,7 +549,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
@ -571,7 +571,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
@ -598,7 +598,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
@ -627,7 +627,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("mock error")).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
@ -643,7 +643,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
@ -660,7 +660,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("mock error")).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
@ -679,7 +679,7 @@ func (s *ChannelManagerSuite) TestStartup() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
var (
@ -717,7 +717,7 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {
s.mockAlloc = NewNMockAllocator(s.T())
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(0, errors.New("mock rootcoord failure"))
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
err = m.Startup(context.TODO(), nil, []int64{2})

View File

@ -43,8 +43,6 @@ type ROChannelStore interface {
// GetNode returns the channel info of a specific node.
// Returns nil if the node doesn't belong to the cluster
GetNode(nodeID int64) *NodeChannelInfo
// HasChannel checks if store already has the channel
HasChannel(channel string) bool
// GetNodesChannels returns the channels that are assigned to nodes.
// without bufferID node
GetNodesChannels() []*NodeChannelInfo
@ -52,8 +50,6 @@ type ROChannelStore interface {
GetBufferChannelInfo() *NodeChannelInfo
// GetNodes gets all node ids in store.
GetNodes() []int64
// GetNodeChannelCount
GetNodeChannelCount(nodeID int64) int
// GetNodeChannels for given collection
GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string
@ -116,22 +112,6 @@ func NewChannelOp(ID int64, opType ChannelOpType, channels ...RWChannel) *Channe
}
}
func NewAddOp(id int64, channels ...RWChannel) *ChannelOp {
return &ChannelOp{
NodeID: id,
Type: Add,
Channels: channels,
}
}
func NewDeleteOp(id int64, channels ...RWChannel) *ChannelOp {
return &ChannelOp{
NodeID: id,
Type: Delete,
Channels: channels,
}
}
func (op *ChannelOp) Append(channels ...RWChannel) {
op.Channels = append(op.Channels, channels...)
}

View File

@ -137,48 +137,6 @@ func (_c *MockRWChannelStore_GetNode_Call) RunAndReturn(run func(int64) *NodeCha
return _c
}
// GetNodeChannelCount provides a mock function with given fields: nodeID
func (_m *MockRWChannelStore) GetNodeChannelCount(nodeID int64) int {
ret := _m.Called(nodeID)
var r0 int
if rf, ok := ret.Get(0).(func(int64) int); ok {
r0 = rf(nodeID)
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MockRWChannelStore_GetNodeChannelCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelCount'
type MockRWChannelStore_GetNodeChannelCount_Call struct {
*mock.Call
}
// GetNodeChannelCount is a helper method to define mock.On call
// - nodeID int64
func (_e *MockRWChannelStore_Expecter) GetNodeChannelCount(nodeID interface{}) *MockRWChannelStore_GetNodeChannelCount_Call {
return &MockRWChannelStore_GetNodeChannelCount_Call{Call: _e.mock.On("GetNodeChannelCount", nodeID)}
}
func (_c *MockRWChannelStore_GetNodeChannelCount_Call) Run(run func(nodeID int64)) *MockRWChannelStore_GetNodeChannelCount_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelCount_Call) Return(_a0 int) *MockRWChannelStore_GetNodeChannelCount_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int64) int) *MockRWChannelStore_GetNodeChannelCount_Call {
_c.Call.Return(run)
return _c
}
// GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors
func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
_va := make([]interface{}, len(channelSelectors))
@ -368,48 +326,6 @@ func (_c *MockRWChannelStore_GetNodesChannels_Call) RunAndReturn(run func() []*N
return _c
}
// HasChannel provides a mock function with given fields: channel
func (_m *MockRWChannelStore) HasChannel(channel string) bool {
ret := _m.Called(channel)
var r0 bool
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockRWChannelStore_HasChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasChannel'
type MockRWChannelStore_HasChannel_Call struct {
*mock.Call
}
// HasChannel is a helper method to define mock.On call
// - channel string
func (_e *MockRWChannelStore_Expecter) HasChannel(channel interface{}) *MockRWChannelStore_HasChannel_Call {
return &MockRWChannelStore_HasChannel_Call{Call: _e.mock.On("HasChannel", channel)}
}
func (_c *MockRWChannelStore_HasChannel_Call) Run(run func(channel string)) *MockRWChannelStore_HasChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockRWChannelStore_HasChannel_Call) Return(_a0 bool) *MockRWChannelStore_HasChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_HasChannel_Call) RunAndReturn(run func(string) bool) *MockRWChannelStore_HasChannel_Call {
_c.Call.Return(run)
return _c
}
// Reload provides a mock function with given fields:
func (_m *MockRWChannelStore) Reload() error {
ret := _m.Called()

View File

@ -17,7 +17,6 @@
package datacoord
import (
"math"
"sort"
"github.com/samber/lo"
@ -28,77 +27,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// RegisterPolicy decides the channels mapping after registering a new nodeID
// return bufferedUpdates and balanceUpdates
type RegisterPolicy func(store ROChannelStore, nodeID int64) (*ChannelOpSet, *ChannelOpSet)
// EmptyRegister does nothing
func EmptyRegister(store ROChannelStore, nodeID int64) (*ChannelOpSet, *ChannelOpSet) {
return nil, nil
}
// BufferChannelAssignPolicy assigns buffer channels to new registered node
func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) *ChannelOpSet {
info := store.GetBufferChannelInfo()
if info == nil || len(info.Channels) == 0 {
return nil
}
opSet := NewChannelOpSet(
NewChannelOp(bufferID, Delete, lo.Values(info.Channels)...),
NewChannelOp(nodeID, Watch, lo.Values(info.Channels)...))
return opSet
}
// AvgAssignRegisterPolicy assigns channels with average to new registered node
// Register will not directly delete the node-channel pair. Channel manager will handle channel release.
func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) (*ChannelOpSet, *ChannelOpSet) {
opSet := BufferChannelAssignPolicy(store, nodeID)
if opSet != nil {
return opSet, nil
}
// Get a list of available node-channel info.
allNodes := store.GetNodesChannels()
avaNodes := filterNode(allNodes, nodeID)
channelNum := 0
for _, info := range avaNodes {
channelNum += len(info.Channels)
}
// store already add the new node
chPerNode := channelNum / len(allNodes)
if chPerNode == 0 {
return nil, nil
}
// sort in descending order and reallocate
sort.Slice(avaNodes, func(i, j int) bool {
return len(avaNodes[i].Channels) > len(avaNodes[j].Channels)
})
releases := make(map[int64][]RWChannel)
for i := 0; i < chPerNode; i++ {
// Pick a node with its channel to release.
toRelease := avaNodes[i%len(avaNodes)]
// Pick a channel that will be reassigned to the new node later.
chIdx := i / len(avaNodes)
if chIdx >= len(toRelease.Channels) {
// Node has too few channels, simply skip. No re-picking.
// TODO: Consider re-picking in case assignment is extremely uneven?
continue
}
releases[toRelease.NodeID] = append(releases[toRelease.NodeID], lo.Values(toRelease.Channels)[chIdx])
}
// Channels in `releases` are reassigned eventually by channel manager.
opSet = NewChannelOpSet()
for k, v := range releases {
opSet.Append(k, Release, v...)
}
return nil, opSet
}
// filterNode filters out node-channel info where node ID == `nodeID`.
func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo {
filtered := make([]*NodeChannelInfo, 0)
@ -111,177 +39,6 @@ func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo {
return filtered
}
// ChannelAssignPolicy assign new channels to registered nodes.
type ChannelAssignPolicy func(store ROChannelStore, channels []RWChannel) *ChannelOpSet
// AverageAssignPolicy ensure that the number of channels per nodes is approximately the same
func AverageAssignPolicy(store ROChannelStore, channels []RWChannel) *ChannelOpSet {
newChannels := lo.Filter(channels, func(ch RWChannel, _ int) bool {
return !store.HasChannel(ch.GetName())
})
if len(newChannels) == 0 {
return nil
}
opSet := NewChannelOpSet()
allDataNodes := store.GetNodesChannels()
// If no datanode alive, save channels in buffer
if len(allDataNodes) == 0 {
opSet.Append(bufferID, Watch, channels...)
return opSet
}
// sort and assign
sort.Slice(allDataNodes, func(i, j int) bool {
return len(allDataNodes[i].Channels) <= len(allDataNodes[j].Channels)
})
updates := make(map[int64][]RWChannel)
for i, newChannel := range newChannels {
n := allDataNodes[i%len(allDataNodes)].NodeID
updates[n] = append(updates[n], newChannel)
}
for id, chs := range updates {
opSet.Append(id, Watch, chs...)
}
return opSet
}
// DeregisterPolicy determine the mapping after deregistering the nodeID
type DeregisterPolicy func(store ROChannelStore, nodeID int64) *ChannelOpSet
// EmptyDeregisterPolicy do nothing
func EmptyDeregisterPolicy(store ROChannelStore, nodeID int64) *ChannelOpSet {
return nil
}
// AvgAssignUnregisteredChannels evenly assign the unregistered channels
func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) *ChannelOpSet {
nodeChannel := store.GetNode(nodeID)
if nodeChannel == nil || len(nodeChannel.Channels) == 0 {
return nil
}
unregisteredChannels := nodeChannel.Channels
avaNodes := lo.Filter(store.GetNodesChannels(), func(info *NodeChannelInfo, _ int) bool {
return info.NodeID != nodeID
})
opSet := NewChannelOpSet()
opSet.Delete(nodeChannel.NodeID, lo.Values(nodeChannel.Channels)...)
if len(avaNodes) == 0 {
opSet.Append(bufferID, Watch, lo.Values(unregisteredChannels)...)
return opSet
}
// sort and assign
sort.Slice(avaNodes, func(i, j int) bool {
return len(avaNodes[i].Channels) <= len(avaNodes[j].Channels)
})
updates := make(map[int64][]RWChannel)
cnt := 0
for _, unregisteredChannel := range unregisteredChannels {
n := avaNodes[cnt%len(avaNodes)].NodeID
updates[n] = append(updates[n], unregisteredChannel)
cnt++
}
for id, chs := range updates {
opSet.Append(id, Watch, chs...)
}
return opSet
}
// ChannelReassignPolicy is a policy for reassigning channels
type ChannelReassignPolicy func(store ROChannelStore, reassigns []*NodeChannelInfo) *ChannelOpSet
// EmptyReassignPolicy is a dummy reassign policy
func EmptyReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) *ChannelOpSet {
return nil
}
// AverageReassignPolicy is a reassigning policy that evenly balance channels among datanodes
func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) *ChannelOpSet {
allNodes := store.GetNodesChannels()
toReassignTotalNum := 0
for _, reassign := range reassigns {
toReassignTotalNum += len(reassign.Channels)
}
avaNodes := make([]*NodeChannelInfo, 0, len(allNodes))
avaNodesChannelSum := 0
for _, node := range allNodes {
if lo.ContainsBy(reassigns, func(info *NodeChannelInfo) bool {
return node.NodeID == info.NodeID
}) {
continue
}
avaNodes = append(avaNodes, node)
avaNodesChannelSum += len(node.Channels)
}
log.Info("AverageReassignPolicy working", zap.Int("avaNodesCount", len(avaNodes)),
zap.Int("toAssignChannelNum", toReassignTotalNum), zap.Int("avaNodesChannelSum", avaNodesChannelSum))
if len(avaNodes) == 0 {
// if no node is left, do not reassign
return nil
}
opSet := NewChannelOpSet()
avgChannelCount := int(math.Ceil(float64(avaNodesChannelSum+toReassignTotalNum) / (float64(len(avaNodes)))))
sort.Slice(avaNodes, func(i, j int) bool {
if len(avaNodes[i].Channels) == len(avaNodes[j].Channels) {
return avaNodes[i].NodeID < avaNodes[j].NodeID
}
return len(avaNodes[i].Channels) < len(avaNodes[j].Channels)
})
// reassign channels to remaining nodes
addUpdates := make(map[int64]*ChannelOp)
for _, reassign := range reassigns {
opSet.Delete(reassign.NodeID, lo.Values(reassign.Channels)...)
for _, ch := range reassign.Channels {
nodeIdx := 0
for {
targetID := avaNodes[nodeIdx%len(avaNodes)].NodeID
if nodeIdx < len(avaNodes) {
existedChannelCount := store.GetNodeChannelCount(targetID)
if _, ok := addUpdates[targetID]; !ok {
if existedChannelCount >= avgChannelCount {
log.Debug("targetNodeID has had more channels than average, skip", zap.Int64("targetID",
targetID), zap.Int("existedChannelCount", existedChannelCount))
nodeIdx++
continue
}
} else {
addingChannelCount := len(addUpdates[targetID].Channels)
if existedChannelCount+addingChannelCount >= avgChannelCount {
log.Debug("targetNodeID has had more channels than average, skip", zap.Int64("targetID",
targetID), zap.Int("currentChannelCount", existedChannelCount+addingChannelCount))
nodeIdx++
continue
}
}
} else {
nodeIdx++
}
if _, ok := addUpdates[targetID]; !ok {
addUpdates[targetID] = NewChannelOp(targetID, Watch, ch)
} else {
addUpdates[targetID].Append(ch)
}
break
}
}
}
opSet.Insert(lo.Values(addUpdates)...)
return opSet
}
type Assignments []*NodeChannelInfo
func (a Assignments) GetChannelCount(nodeID int64) int {
@ -363,6 +120,16 @@ func AvgBalanceChannelPolicy(cluster Assignments) *ChannelOpSet {
return opSet
}
// Assign policy assigns channels to nodes.
// CurrentCluster refers to the current distributions
// ToAssign refers to the target channels needed to be reassigned
//
// if provided, this policy will only assign these channels
// if empty, this policy will balance the currentCluster
//
// ExclusiveNodes means donot assign channels to these nodes.
type AssignPolicy func(currentCluster Assignments, toAssign *NodeChannelInfo, exclusiveNodes []int64) *ChannelOpSet
func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInfo, execlusiveNodes []int64) *ChannelOpSet {
var (
toCluster Assignments

View File

@ -21,7 +21,6 @@ import (
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
@ -59,273 +58,6 @@ func (s *PolicySuite) SetupSubTest() {
s.mockStore = NewMockRWChannelStore(s.T())
}
func (s *PolicySuite) TestBufferChannelAssignPolicy() {
s.Run("Test no channels in bufferID", func() {
s.mockStore.EXPECT().GetBufferChannelInfo().Return(nil)
opSet := BufferChannelAssignPolicy(s.mockStore, 1)
s.Nil(opSet)
})
s.Run("Test channels remain in bufferID", func() {
ch2Colls := map[string]int64{
"ch1": 1,
"ch2": 1,
"ch3": 2,
}
info := &NodeChannelInfo{NodeID: bufferID, Channels: getChannels(ch2Colls)}
s.mockStore.EXPECT().GetBufferChannelInfo().Return(info)
var testNodeID int64 = 100
opSet := BufferChannelAssignPolicy(s.mockStore, testNodeID)
s.NotNil(opSet)
s.Equal(2, opSet.Len())
s.Equal(3, opSet.GetChannelNumber())
for _, op := range opSet.Collect() {
s.ElementsMatch([]string{"ch1", "ch2", "ch3"}, op.GetChannelNames())
s.True(op.Type == Delete || op.Type == Watch)
if op.Type == Delete {
s.EqualValues(bufferID, op.NodeID)
}
if op.Type == Watch {
s.EqualValues(testNodeID, op.NodeID)
}
}
})
}
func (s *PolicySuite) TestAvarageAssignPolicy() {
ch2Coll := map[string]int64{
"ch1": 100,
"ch2": 100,
}
var testNodeID int64 = 9
s.Run("no balance after register", func() {
s.mockStore.EXPECT().GetBufferChannelInfo().Return(nil)
s.mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{NodeID: testNodeID},
})
bufferOp, balanceOp := AvgAssignRegisterPolicy(s.mockStore, testNodeID)
s.Nil(bufferOp)
s.Nil(balanceOp)
})
s.Run("balance bufferID channels after register", func() {
s.mockStore.EXPECT().GetBufferChannelInfo().Return(
&NodeChannelInfo{NodeID: bufferID, Channels: getChannels(ch2Coll)},
)
bufferOp, balanceOp := AvgAssignRegisterPolicy(s.mockStore, testNodeID)
s.Nil(balanceOp)
s.NotNil(bufferOp)
s.Equal(2, bufferOp.Len())
s.Equal(2, bufferOp.GetChannelNumber())
for _, op := range bufferOp.Collect() {
s.ElementsMatch(lo.Keys(ch2Coll), op.GetChannelNames())
s.True(op.Type == Delete || op.Type == Watch)
if op.Type == Delete {
s.EqualValues(bufferID, op.NodeID)
}
if op.Type == Watch {
s.EqualValues(testNodeID, op.NodeID)
}
}
log.Info("got bufferOp", zap.Any("op", bufferOp))
})
s.Run("balance after register", func() {
s.mockStore.EXPECT().GetBufferChannelInfo().Return(nil)
s.mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{NodeID: 1, Channels: getChannels(ch2Coll)},
{NodeID: testNodeID},
})
bufferOp, balanceOp := AvgAssignRegisterPolicy(s.mockStore, testNodeID)
s.Nil(bufferOp)
s.NotNil(balanceOp)
s.Equal(1, balanceOp.Len())
s.Equal(1, balanceOp.GetChannelNumber())
for _, op := range balanceOp.Collect() {
s.Equal(Release, op.Type)
s.EqualValues(1, op.NodeID)
}
log.Info("got balanceOp", zap.Any("op", balanceOp))
})
}
func (s *PolicySuite) TestAverageAssignPolicy() {
ch2Coll := map[string]int64{
"ch1": 1,
"ch2": 1,
"ch3": 2,
}
channels := getChannels(ch2Coll)
s.Run("no new channels", func() {
s.mockStore.EXPECT().HasChannel(mock.Anything).Return(true)
opSet := AverageAssignPolicy(s.mockStore, lo.Values(channels))
s.Nil(opSet)
})
s.Run("no datanodes", func() {
s.mockStore.EXPECT().HasChannel(mock.Anything).Return(false)
s.mockStore.EXPECT().GetNodesChannels().Return(nil)
channels := getChannels(ch2Coll)
opSet := AverageAssignPolicy(s.mockStore, lo.Values(channels))
s.NotNil(opSet)
s.Equal(1, opSet.Len())
op := opSet.Collect()[0]
s.EqualValues(bufferID, op.NodeID)
s.Equal(Watch, op.Type)
s.ElementsMatch(lo.Keys(ch2Coll), op.GetChannelNames())
})
s.Run("one datanode", func() {
// Test three channels assigned one datanode
s.mockStore.EXPECT().HasChannel(mock.Anything).Return(false)
s.mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{NodeID: 1, Channels: getChannels(map[string]int64{"channel": 1})},
})
channels := getChannels(ch2Coll)
opSet := AverageAssignPolicy(s.mockStore, lo.Values(channels))
s.NotNil(opSet)
s.Equal(1, opSet.Len())
s.Equal(3, opSet.GetChannelNumber())
for _, op := range opSet.Collect() {
s.Equal(Watch, op.Type)
s.EqualValues(1, op.NodeID)
s.Equal(3, len(op.GetChannelNames()))
s.ElementsMatch(lo.Keys(ch2Coll), op.GetChannelNames())
}
log.Info("test OpSet", zap.Any("opset", opSet))
})
s.Run("three datanode", func() {
// Test three channels assigned evenly to three datanodes
s.mockStore.EXPECT().HasChannel(mock.Anything).Return(false)
s.mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
})
opSet := AverageAssignPolicy(s.mockStore, lo.Values(channels))
s.NotNil(opSet)
s.Equal(3, opSet.Len())
s.Equal(3, opSet.GetChannelNumber())
s.ElementsMatch([]int64{1, 2, 3}, lo.Map(opSet.Collect(), func(op *ChannelOp, _ int) int64 {
return op.NodeID
}))
for _, op := range opSet.Collect() {
s.True(lo.Contains([]int64{1, 2, 3}, op.NodeID))
s.Equal(1, len(op.GetChannelNames()))
s.Equal(Watch, op.Type)
s.True(lo.Contains(lo.Keys(ch2Coll), op.GetChannelNames()[0]))
}
log.Info("test OpSet", zap.Any("opset", opSet))
})
}
func (s *PolicySuite) TestAvgAssignUnregisteredChannels() {
ch2Coll := map[string]int64{
"ch1": 1,
"ch2": 1,
"ch3": 2,
}
info := &NodeChannelInfo{
NodeID: 1,
Channels: getChannels(ch2Coll),
}
s.Run("deregistering last node", func() {
s.mockStore.EXPECT().GetNode(mock.Anything).Return(info)
s.mockStore.EXPECT().GetNodesChannels().Return(nil)
opSet := AvgAssignUnregisteredChannels(s.mockStore, info.NodeID)
s.NotNil(opSet)
s.Equal(2, opSet.Len())
for _, op := range opSet.Collect() {
s.ElementsMatch(lo.Keys(ch2Coll), op.GetChannelNames())
if op.Type == Delete {
s.EqualValues(info.NodeID, op.NodeID)
}
if op.Type == Watch {
s.EqualValues(bufferID, op.NodeID)
}
}
log.Info("test OpSet", zap.Any("opset", opSet))
})
s.Run("assign channels after deregistering", func() {
s.mockStore.EXPECT().GetNode(mock.Anything).Return(info)
s.mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{NodeID: 100},
})
opSet := AvgAssignUnregisteredChannels(s.mockStore, info.NodeID)
s.NotNil(opSet)
s.Equal(2, opSet.Len())
for _, op := range opSet.Collect() {
s.ElementsMatch(lo.Keys(ch2Coll), op.GetChannelNames())
s.True(op.Type == Delete || op.Type == Watch)
if op.Type == Delete {
s.EqualValues(info.NodeID, op.NodeID)
}
if op.Type == Watch {
s.EqualValues(100, op.NodeID)
}
}
log.Info("test OpSet", zap.Any("opset", opSet))
})
s.Run("test average", func() {
s.mockStore.EXPECT().GetNode(mock.Anything).Return(info)
s.mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{NodeID: 100},
{NodeID: 101},
{NodeID: 102},
})
opSet := AvgAssignUnregisteredChannels(s.mockStore, info.NodeID)
s.NotNil(opSet)
s.Equal(4, opSet.Len())
nodeCh := make(map[int64]string)
for _, op := range opSet.Collect() {
s.True(op.Type == Delete || op.Type == Watch)
if op.Type == Delete {
s.EqualValues(info.NodeID, op.NodeID)
s.ElementsMatch(lo.Keys(ch2Coll), op.GetChannelNames())
}
if op.Type == Watch {
s.Equal(1, len(op.GetChannelNames()))
nodeCh[op.NodeID] = op.GetChannelNames()[0]
}
}
s.ElementsMatch([]int64{100, 101, 102}, lo.Keys(nodeCh))
s.ElementsMatch(lo.Keys(ch2Coll), lo.Values(nodeCh))
log.Info("test OpSet", zap.Any("opset", opSet))
})
}
func (s *PolicySuite) TestAvgBalanceChannelPolicy() {
s.Run("test even distribution", func() {
// even distribution should have not results
@ -370,91 +102,6 @@ func (s *PolicySuite) TestAvgBalanceChannelPolicy() {
})
}
func (s *PolicySuite) TestAvgReassignPolicy() {
s.Run("test only one node", func() {
ch2Coll := map[string]int64{
"ch1": 1,
"ch2": 1,
"ch3": 2,
"ch4": 2,
"ch5": 3,
}
fiveChannels := getChannels(ch2Coll)
storedInfo := []*NodeChannelInfo{{100, fiveChannels}}
s.mockStore.EXPECT().GetNodesChannels().Return(storedInfo)
opSet := AverageReassignPolicy(s.mockStore, storedInfo)
s.Nil(opSet)
})
s.Run("test zero average", func() {
// as we use ceil to calculate the wanted average number, there should be one reassign
// though the average num less than 1
storedInfo := []*NodeChannelInfo{
{100, getChannels(map[string]int64{"ch1": 1})},
{NodeID: 102},
{NodeID: 103},
{NodeID: 104},
}
s.mockStore.EXPECT().GetNodesChannels().Return(storedInfo)
s.mockStore.EXPECT().GetNodeChannelCount(mock.Anything).RunAndReturn(func(nodeID int64) int {
for _, info := range storedInfo {
if info.NodeID == nodeID {
return len(info.Channels)
}
}
return 0
})
opSet := AverageReassignPolicy(s.mockStore, storedInfo[0:1])
s.NotNil(opSet)
s.Equal(2, opSet.Len())
for _, op := range opSet.Collect() {
s.Equal(1, len(op.GetChannelNames()))
s.Equal("ch1", op.GetChannelNames()[0])
s.True(op.Type == Delete || op.Type == Watch)
}
log.Info("test OpSet", zap.Any("opset", opSet))
})
s.Run("test reassign one to one", func() {
storedInfo := []*NodeChannelInfo{
{100, getChannels(map[string]int64{"ch1": 1, "ch2": 1, "ch3": 1, "ch4": 1})},
{NodeID: 101},
{NodeID: 102},
}
s.mockStore.EXPECT().GetNodesChannels().Return(storedInfo)
s.mockStore.EXPECT().GetNodeChannelCount(mock.Anything).RunAndReturn(func(nodeID int64) int {
for _, info := range storedInfo {
if info.NodeID == nodeID {
return len(info.Channels)
}
}
return 0
})
opSet := AverageReassignPolicy(s.mockStore, storedInfo[0:1])
s.NotNil(opSet)
s.Equal(3, opSet.Len())
for _, op := range opSet.Collect() {
s.True(op.Type == Delete || op.Type == Watch)
if op.Type == Delete {
s.ElementsMatch([]string{"ch1", "ch2", "ch3", "ch4"}, op.GetChannelNames())
s.EqualValues(100, op.NodeID)
}
if op.Type == Watch {
s.Equal(2, len(op.GetChannelNames()))
s.True(lo.Contains([]int64{102, 101}, op.NodeID))
}
}
log.Info("test OpSet", zap.Any("opset", opSet))
})
}
type AssignByCountPolicySuite struct {
suite.Suite

View File

@ -464,7 +464,7 @@ func (s *Server) initCluster() error {
s.sessionManager = NewSessionManagerImpl(withSessionCreator(s.dataNodeCreator))
var err error
s.channelManager, err = NewChannelManagerV2(s.watchClient, s.handler, s.sessionManager, s.allocator, withCheckerV2())
s.channelManager, err = NewChannelManager(s.watchClient, s.handler, s.sessionManager, s.allocator, withCheckerV2())
if err != nil {
return err
}

View File

@ -2459,7 +2459,7 @@ func TestOptions(t *testing.T) {
defer kv.RemoveWithPrefix("")
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManagerV2(kv, newMockHandler(), sessionManager, newMockAllocator())
channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, newMockAllocator())
assert.NoError(t, err)
cluster := NewClusterImpl(sessionManager, channelManager)
@ -2490,20 +2490,6 @@ func TestOptions(t *testing.T) {
})
}
type mockPolicyFactory struct {
ChannelPolicyFactoryV1
}
// NewRegisterPolicy create a new register policy
func (p *mockPolicyFactory) NewRegisterPolicy() RegisterPolicy {
return EmptyRegister
}
// NewDeregisterPolicy create a new dereigster policy
func (p *mockPolicyFactory) NewDeregisterPolicy() DeregisterPolicy {
return EmptyDeregisterPolicy
}
func TestHandleSessionEvent(t *testing.T) {
kv := getWatchKV(t)
defer func() {
@ -2514,7 +2500,7 @@ func TestHandleSessionEvent(t *testing.T) {
defer cancel()
sessionManager := NewSessionManagerImpl()
channelManager, err := NewChannelManagerV2(kv, newMockHandler(), sessionManager, newMockAllocator(), withFactoryV2(&mockPolicyFactory{}))
channelManager, err := NewChannelManager(kv, newMockHandler(), sessionManager, newMockAllocator())
assert.NoError(t, err)
cluster := NewClusterImpl(sessionManager, channelManager)