Make DataNode release rather than delete when reassign (#17293)

1. Reassgin now will assign to the original Node if no other nodes
   avaliable
2. Make AddNode balance async: ToRealse + Reassign

See also: #16114, #17270

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2022-06-02 13:56:04 +08:00 committed by GitHub
parent 8cb59a9233
commit 42f643e727
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 345 additions and 168 deletions

View File

@ -172,12 +172,17 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
// ReleaseSuccess remove
// ReleaseFail clean up and remove
func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
// Load all the watch infos before processing
nodeWatchInfos := make(map[UniqueID][]*datapb.ChannelWatchInfo)
for _, nodeID := range nodes {
watchInfos, err := c.stateTimer.loadAllChannels(nodeID)
if err != nil {
return err
}
nodeWatchInfos[nodeID] = watchInfos
}
for nodeID, watchInfos := range nodeWatchInfos {
for _, info := range watchInfos {
channelName := info.GetVchan().GetChannelName()
@ -198,12 +203,12 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, info.GetTimeoutTs())
case datapb.ChannelWatchState_ReleaseSuccess:
if err := c.toDelete(nodeID, channelName); err != nil {
if err := c.Reassign(nodeID, channelName); err != nil {
return err
}
case datapb.ChannelWatchState_ReleaseFailure:
if err := c.cleanUpAndDelete(nodeID, channelName); err != nil {
if err := c.CleanupAndReassign(nodeID, channelName); err != nil {
return err
}
}
@ -318,9 +323,9 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
c.store.Add(nodeID)
// the default registerPolicy doesn't reassgin channels already there
updates := c.registerPolicy(c.store, nodeID)
if len(updates) <= 0 {
log.Info("register node with no reassignment", zap.Int64("registered node", nodeID))
return nil
}
@ -328,7 +333,7 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
zap.Int64("registered node", nodeID),
zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToRelease)
}
// DeleteNode deletes the node from the cluster.
@ -348,10 +353,30 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
log.Warn("deregister node",
zap.Int64("unregistered node", nodeID),
zap.Array("updates", updates))
if len(updates) <= 0 {
return nil
}
var channels []*channel
for _, op := range updates {
if op.Type == Delete {
channels = op.Channels
}
}
chNames := make([]string, 0, len(channels))
for _, ch := range channels {
chNames = append(chNames, ch.Name)
}
log.Debug("remove timers for channel of the deregistered node",
zap.Any("channels", chNames), zap.Int64("nodeID", nodeID))
c.stateTimer.removeTimers(chNames)
if err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch); err != nil {
return err
}
// No channels will be return
_, err := c.store.Delete(nodeID)
return err
}
@ -579,14 +604,16 @@ func (c *ChannelManager) processAck(e *ackEvent) {
}
case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease
err := c.cleanUpAndDelete(e.nodeID, e.channelName)
// Cleanup, Delete and Reassign
err := c.CleanupAndReassign(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to clean and delete channels for release failure ACKs",
log.Warn("fail to clean and reassign channels for release failure ACKs",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
}
case releaseSuccessAck:
err := c.toDelete(e.nodeID, e.channelName)
// Delete and Reassign
err := c.Reassign(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to response to release success ACK",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
@ -594,8 +621,8 @@ func (c *ChannelManager) processAck(e *ackEvent) {
}
}
// cleanUpAndDelete tries to clean up datanode's subscription, and then delete channel watch info.
func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) error {
// CleanupAndReassign tries to clean up datanode's subscription, and then delete channel watch info.
func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -612,35 +639,31 @@ func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) e
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}
if !c.isMarkedDrop(channelName) {
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}
// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add channel to buffer", zap.String("channel name", channelName))
updates.Add(bufferID, []*channel{chToCleanUp})
}
err := c.remove(nodeID, chToCleanUp)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}
log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}
err := c.remove(nodeID, chToCleanUp)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
if c.isMarkedDrop(channelName) {
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
return nil
}
return nil
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}
// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add channel to the original node",
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{chToCleanUp})
}
log.Info("channel manager reassign channels", zap.Int64("old nodeID", nodeID), zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}
type channelStateChecker func(context.Context)
@ -724,8 +747,8 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
return err
}
// toDelete removes channel assignment from a datanode
func (c *ChannelManager) toDelete(nodeID UniqueID, channelName string) error {
// Reassign removes channel assignment from a datanode
func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -734,36 +757,33 @@ func (c *ChannelManager) toDelete(nodeID UniqueID, channelName string) error {
return fmt.Errorf("fail to find matching nodeID: %d with channelName: %s", nodeID, channelName)
}
if !c.isMarkedDrop(channelName) {
reallocates := &NodeChannelInfo{nodeID, []*channel{ch}}
// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add to the buffer", zap.String("channel name", channelName))
updates.Add(bufferID, []*channel{ch})
}
err := c.remove(nodeID, ch)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
if err := c.remove(nodeID, ch); err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
err := c.remove(nodeID, ch)
if err != nil {
return err
if c.isMarkedDrop(channelName) {
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
log.Info("removed channel assignment", zap.Any("channel", ch))
return nil
}
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
reallocates := &NodeChannelInfo{nodeID, []*channel{ch}}
// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, assign to the original Node",
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName))
updates.Add(nodeID, []*channel{ch})
}
log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
log.Info("removed channel assignment", zap.Any("channel", ch))
return nil
}
func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) *channel {

View File

@ -33,9 +33,19 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"stathat.com/c/consistent"
)
func waitForEctdDataReady(metakv kv.MetaKv, key string) {
for {
// make sure etcd has finished the operation
_, err := metakv.Load(key)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
}
func checkWatchInfoWithState(t *testing.T, kv kv.MetaKv, state datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) {
prefix := Params.DataCoordCfg.ChannelWatchSubPath
@ -88,17 +98,6 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}
}
makeSureEctdData := func(key string) {
for {
// make sure etcd has finished the operation
_, err := metakv.Load(key)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
}
t.Run("toWatch-WatchSuccess", func(t *testing.T) {
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
@ -119,7 +118,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
cancel()
wg.Wait()
@ -146,7 +145,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
cancel()
wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
@ -177,7 +176,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.stateTimer.notifyTimeoutWatcher(e)
chManager.stateTimer.stopIfExsit(e)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1))
cancel()
wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
@ -216,7 +215,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1))
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1))
cancel()
wg.Wait()
@ -263,7 +262,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
makeSureEctdData(path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1))
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1))
cancel()
wg.Wait()
@ -374,18 +373,16 @@ func TestChannelManager(t *testing.T) {
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID)
})
t.Run("test toDelete", func(t *testing.T) {
t.Run("test Reassign", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var collectionID = UniqueID(5)
tests := []struct {
isvalid bool
nodeID UniqueID
chName string
nodeID UniqueID
chName string
}{
{true, UniqueID(125), "normal-chan"},
{true, UniqueID(115), "to-delete-chan"},
{false, UniqueID(9), "invalid-chan"},
{UniqueID(125), "normal-chan"},
{UniqueID(115), "to-delete-chan"},
}
chManager, err := NewChannelManager(metakv, newMockHandler())
@ -393,20 +390,18 @@ func TestChannelManager(t *testing.T) {
// prepare tests
for _, test := range tests {
if test.isvalid {
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)
info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName))
require.NoError(t, err)
require.NotNil(t, info)
}
info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName))
require.NoError(t, err)
require.NotNil(t, info)
}
remainTest, reassignTest := tests[0], tests[1]
err = chManager.toDelete(reassignTest.nodeID, reassignTest.chName)
err = chManager.Reassign(reassignTest.nodeID, reassignTest.chName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
@ -423,29 +418,54 @@ func TestChannelManager(t *testing.T) {
assert.Equal(t, 2, len(nodeChanInfo.Channels))
assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels)
// Delete node of reassginTest and try to toDelete node in remainTest
// Delete node of reassginTest and try to Reassign node in remainTest
err = chManager.DeleteNode(reassignTest.nodeID)
require.NoError(t, err)
err = chManager.toDelete(remainTest.nodeID, remainTest.chName)
err = chManager.Reassign(remainTest.nodeID, remainTest.chName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
// channel is added to bufferID because there's only one node left
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, remainTest.chName, collectionID)
// channel is added to remainTest because there's only one node left
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
})
t.Run("test cleanUpAndDelete", func(t *testing.T) {
t.Run("test DeleteNode", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var (
collectionID = UniqueID(999)
)
chManager, err := NewChannelManager(metakv, newMockHandler(), withStateChecker())
require.NoError(t, err)
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{
{"channel-1", collectionID},
{"channel-2", collectionID}}},
bufferID: {bufferID, []*channel{}},
},
}
chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, time.Now().Add(maxWatchDuration).UnixNano())
err = chManager.DeleteNode(1)
assert.NoError(t, err)
chs := chManager.store.GetBufferChannelInfo()
assert.Equal(t, 2, len(chs.Channels))
})
t.Run("test CleanupAndReassign", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var collectionID = UniqueID(6)
tests := []struct {
isvalid bool
nodeID UniqueID
chName string
nodeID UniqueID
chName string
}{
{true, UniqueID(126), "normal-chan"},
{true, UniqueID(116), "to-delete-chan"},
{false, UniqueID(9), "invalid-chan"},
{UniqueID(126), "normal-chan"},
{UniqueID(116), "to-delete-chan"},
}
factory := dependency.NewDefaultFactory(true)
@ -457,20 +477,18 @@ func TestChannelManager(t *testing.T) {
// prepare tests
for _, test := range tests {
if test.isvalid {
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)
info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName))
require.NoError(t, err)
require.NotNil(t, info)
}
info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName))
require.NoError(t, err)
require.NotNil(t, info)
}
remainTest, reassignTest := tests[0], tests[1]
err = chManager.cleanUpAndDelete(reassignTest.nodeID, reassignTest.chName)
err = chManager.CleanupAndReassign(reassignTest.nodeID, reassignTest.chName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
@ -487,16 +505,16 @@ func TestChannelManager(t *testing.T) {
assert.Equal(t, 2, len(nodeChanInfo.Channels))
assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels)
// Delete node of reassginTest and try to cleanUpAndDelete node in remainTest
// Delete node of reassginTest and try to CleanupAndReassign node in remainTest
err = chManager.DeleteNode(reassignTest.nodeID)
require.NoError(t, err)
err = chManager.cleanUpAndDelete(remainTest.nodeID, remainTest.chName)
err = chManager.CleanupAndReassign(remainTest.nodeID, remainTest.chName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
// channel is added to bufferID because there's only one node left
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, remainTest.chName, collectionID)
// channel is added to remainTest because there's only one node left
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
})
t.Run("test getChannelByNodeAndName", func(t *testing.T) {
@ -599,7 +617,7 @@ func TestChannelManager_Reload(t *testing.T) {
)
prefix := Params.DataCoordCfg.ChannelWatchSubPath
getWatchInfoWithState := func(state datapb.ChannelWatchState) *datapb.ChannelWatchInfo {
getWatchInfoWithState := func(state datapb.ChannelWatchState, collectionID UniqueID, channelName string) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: collectionID,
@ -615,7 +633,7 @@ func TestChannelManager_Reload(t *testing.T) {
t.Run("ToWatch", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToWatch))
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToWatch, collectionID, channelName))
require.NoError(t, err)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
@ -630,7 +648,7 @@ func TestChannelManager_Reload(t *testing.T) {
t.Run("ToRelease", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToRelease))
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToRelease, collectionID, channelName))
require.NoError(t, err)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
@ -654,7 +672,7 @@ func TestChannelManager_Reload(t *testing.T) {
nodeID: {nodeID, []*channel{{channelName, collectionID}}}},
}
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure))
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure, collectionID, channelName))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data))
require.NoError(t, err)
@ -669,15 +687,15 @@ func TestChannelManager_Reload(t *testing.T) {
defer metakv.RemoveWithPrefix("")
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess))
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess, collectionID, channelName))
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}}},
}
chManager.AddNode(bufferID)
require.NoError(t, err)
chManager.AddNode(UniqueID(111))
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data))
require.NoError(t, err)
err = chManager.checkOldNodes([]UniqueID{nodeID})
@ -686,13 +704,16 @@ func TestChannelManager_Reload(t *testing.T) {
v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10)))
assert.Error(t, err)
assert.Empty(t, v)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 111, channelName, collectionID)
chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, channelName, nodeID})
})
t.Run("ReleaseFail", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess))
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseFailure, collectionID, channelName))
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
@ -703,11 +724,12 @@ func TestChannelManager_Reload(t *testing.T) {
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data))
require.NoError(t, err)
err = chManager.checkOldNodes([]UniqueID{nodeID, 999})
err = chManager.checkOldNodes([]UniqueID{nodeID})
assert.NoError(t, err)
time.Sleep(time.Second)
v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10)))
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(999, 10), channelName))
v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName))
assert.Error(t, err)
assert.Empty(t, v)
@ -721,24 +743,153 @@ func TestChannelManager_Reload(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
hash := consistent.New()
cm, err := NewChannelManager(metakv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash)))
cm, err := NewChannelManager(metakv, newMockHandler())
assert.Nil(t, err)
assert.Nil(t, cm.AddNode(1))
assert.Nil(t, cm.AddNode(2))
assert.Nil(t, cm.Watch(&channel{"channel1", 1}))
assert.Nil(t, cm.Watch(&channel{"channel2", 1}))
cm.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{{"channel1", 1}}},
2: {2, []*channel{{"channel2", 1}}},
},
}
hash2 := consistent.New()
cm2, err := NewChannelManager(metakv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash2)))
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchSuccess, 1, "channel1"))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(1, 10), "channel1"), string(data))
require.NoError(t, err)
data, err = proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchSuccess, 1, "channel2"))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(2, 10), "channel2"), string(data))
require.NoError(t, err)
cm2, err := NewChannelManager(metakv, newMockHandler())
assert.Nil(t, err)
assert.Nil(t, cm2.Startup(ctx, []int64{1, 2}))
assert.Nil(t, cm2.AddNode(3))
assert.Nil(t, cm2.Startup(ctx, []int64{3}))
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(3, 10), "channel2"))
assert.True(t, cm2.Match(3, "channel1"))
assert.True(t, cm2.Match(3, "channel2"))
cm2.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel1", 3})
cm2.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, "channel2", 3})
})
}
func TestChannelManager_BalanceBehaviour(t *testing.T) {
metakv := getMetaKv(t)
defer func() {
metakv.RemoveWithPrefix("")
metakv.Close()
}()
prefix := Params.DataCoordCfg.ChannelWatchSubPath
t.Run("one node with two channels add a new node", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var (
collectionID = UniqueID(999)
)
chManager, err := NewChannelManager(metakv, newMockHandler(), withStateChecker())
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.TODO())
chManager.stopChecker = cancel
defer cancel()
go chManager.stateChecker(ctx)
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {1, []*channel{
{"channel-1", collectionID},
{"channel-2", collectionID},
{"channel-3", collectionID}}}},
}
var (
channelBalanced string
)
chManager.AddNode(2)
channelBalanced = "channel-1"
waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) {
for {
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)
v, err := metakv.Load(key)
if err == nil && len(v) > 0 {
watchInfo, err := parseWatchInfo(key, []byte(v))
require.NoError(t, err)
require.Equal(t, waitState, watchInfo.GetState())
watchInfo.State = storeState
data, err := proto.Marshal(watchInfo)
require.NoError(t, err)
metakv.Save(key, string(data))
break
}
time.Sleep(100 * time.Millisecond)
}
}
waitAndStore(datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess, 1, channelBalanced)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, channelBalanced)
infos := chManager.store.GetNode(1)
assert.Equal(t, 2, len(infos.Channels))
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
infos = chManager.store.GetNode(2)
assert.Equal(t, 1, len(infos.Channels))
assert.True(t, chManager.Match(2, "channel-1"))
chManager.AddNode(3)
chManager.Watch(&channel{"channel-4", collectionID})
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 3, "channel-4")
infos = chManager.store.GetNode(1)
assert.Equal(t, 2, len(infos.Channels))
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
infos = chManager.store.GetNode(2)
assert.Equal(t, 1, len(infos.Channels))
assert.True(t, chManager.Match(2, "channel-1"))
infos = chManager.store.GetNode(3)
assert.Equal(t, 1, len(infos.Channels))
assert.True(t, chManager.Match(3, "channel-4"))
chManager.DeleteNode(3)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, "channel-4")
infos = chManager.store.GetNode(1)
assert.Equal(t, 2, len(infos.Channels))
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
infos = chManager.store.GetNode(2)
assert.Equal(t, 2, len(infos.Channels))
assert.True(t, chManager.Match(2, "channel-1"))
assert.True(t, chManager.Match(2, "channel-4"))
chManager.DeleteNode(2)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-4")
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-1")
infos = chManager.store.GetNode(1)
assert.Equal(t, 4, len(infos.Channels))
assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3"))
assert.True(t, chManager.Match(1, "channel-1"))
assert.True(t, chManager.Match(1, "channel-4"))
})
}
func TestChannelManager_RemoveChannel(t *testing.T) {
metakv := getMetaKv(t)
defer func() {
@ -818,4 +969,30 @@ func TestChannelManager_HelperFunc(t *testing.T) {
})
}
})
t.Run("test getNewOnLines", func(t *testing.T) {
tests := []struct {
nodes []int64
oNodes []int64
expectedOut []int64
desription string
}{
{[]int64{}, []int64{}, []int64{}, "empty both"},
{[]int64{1}, []int64{}, []int64{1}, "empty oNodes"},
{[]int64{}, []int64{1}, []int64{}, "empty nodes"},
{[]int64{1}, []int64{1}, []int64{}, "same one"},
{[]int64{1, 2}, []int64{1}, []int64{2}, "same one 2"},
{[]int64{1}, []int64{1, 2}, []int64{}, "same one 3"},
{[]int64{1, 2}, []int64{1, 2}, []int64{}, "same two"},
}
for _, test := range tests {
t.Run(test.desription, func(t *testing.T) {
nodes := c.getNewOnLines(test.nodes, test.oNodes)
assert.ElementsMatch(t, test.expectedOut, nodes)
})
}
})
}

View File

@ -51,6 +51,7 @@ func BufferChannelAssignPolicy(store ROChannelStore, nodeID int64) ChannelOpSet
}
// AvgAssignRegisterPolicy assigns channels with average to new registered node
// Register will not directly delete the node-channel pair, channel manager handles the release itself
func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet {
opSet := BufferChannelAssignPolicy(store, nodeID)
if len(opSet) != 0 {
@ -74,24 +75,19 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet {
return len(infos[i].Channels) > len(infos[j].Channels)
})
deletes := make(map[int64][]*channel)
adds := make(map[int64][]*channel)
releases := make(map[int64][]*channel)
for i := 0; i < avg; {
t := infos[i%len(infos)]
idx := i / len(infos)
if idx >= len(t.Channels) {
continue
}
deletes[t.NodeID] = append(deletes[t.NodeID], t.Channels[idx])
adds[nodeID] = append(adds[nodeID], t.Channels[idx])
releases[t.NodeID] = append(releases[t.NodeID], t.Channels[idx])
i++
}
opSet = ChannelOpSet{}
for k, v := range deletes {
opSet.Delete(k, v)
}
for k, v := range adds {
for k, v := range releases {
opSet.Add(k, v)
}
return opSet
@ -114,8 +110,7 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic
elems := formatNodeIDs(store.GetNodes())
hashRing.Set(elems)
removes := make(map[int64][]*channel)
adds := make(map[int64][]*channel)
releases := make(map[int64][]*channel)
// If there are buffer channels, then nodeID is the first node.
opSet := BufferChannelAssignPolicy(store, nodeID)
@ -141,16 +136,12 @@ func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolic
return nil
}
if did != c.NodeID {
removes[c.NodeID] = append(removes[c.NodeID], ch)
adds[did] = append(adds[did], ch)
releases[c.NodeID] = append(releases[c.NodeID], ch)
}
}
}
for id, channels := range removes {
opSet.Delete(id, channels)
}
for id, channels := range adds {
for id, channels := range releases {
opSet.Add(id, channels)
}
return opSet

View File

@ -97,11 +97,10 @@ func TestConsistentHashRegisterPolicy(t *testing.T) {
updates := policy(store, 2)
// chan1 will be hash to 2, chan2 will be hash to 1
assert.NotNil(t, updates)
assert.Equal(t, 2, len(updates))
assert.EqualValues(t, &ChannelOp{Type: Delete, NodeID: 1, Channels: []*channel{channels[0]}}, updates[0])
assert.EqualValues(t, &ChannelOp{Type: Add, NodeID: 2, Channels: []*channel{channels[0]}}, updates[1])
assert.Equal(t, 1, len(updates))
// No Delete operation will be generated
assert.EqualValues(t, &ChannelOp{Type: Add, NodeID: 1, Channels: []*channel{channels[0]}}, updates[0])
})
}
@ -486,14 +485,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
3,
},
[]*ChannelOp{
{
Type: Delete,
NodeID: 1,
Channels: []*channel{{"ch1", 1}},
},
{
Type: Add,
NodeID: 3,
NodeID: 1,
Channels: []*channel{{"ch1", 1}},
},
},
@ -525,14 +519,9 @@ func TestAvgAssignRegisterPolicy(t *testing.T) {
3,
},
[]*ChannelOp{
{
Type: Delete,
NodeID: 1,
Channels: []*channel{{"ch1", 1}},
},
{
Type: Add,
NodeID: 3,
NodeID: 1,
Channels: []*channel{{"ch1", 1}},
},
},