diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 46b29059a3..e9b301998d 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -35,29 +35,43 @@ import ( "github.com/stretchr/testify/require" ) -func waitForEctdDataReady(metakv kv.MetaKv, key string) { +// waitAndStore simulates DataNode's action +func waitAndStore(t *testing.T, metakv kv.MetaKv, key string, waitState, storeState datapb.ChannelWatchState) { for { - // make sure etcd has finished the operation - _, err := metakv.Load(key) - if err == nil { + v, err := metakv.Load(key) + if err == nil && len(v) > 0 { + watchInfo, err := parseWatchInfo(key, []byte(v)) + require.NoError(t, err) + require.Equal(t, waitState, watchInfo.GetState()) + + watchInfo.State = storeState + data, err := proto.Marshal(watchInfo) + require.NoError(t, err) + + metakv.Save(key, string(data)) break } time.Sleep(100 * time.Millisecond) } } -func checkWatchInfoWithState(t *testing.T, kv kv.MetaKv, state datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) { - prefix := Params.DataCoordCfg.ChannelWatchSubPath +// waitAndCheckState checks if the DataCoord writes expected state into Etcd +func waitAndCheckState(t *testing.T, kv kv.MetaKv, expectedState datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) { + for { + prefix := Params.DataCoordCfg.ChannelWatchSubPath + v, err := kv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)) + if err == nil && len(v) > 0 { + watchInfo, err := parseWatchInfo("fake", []byte(v)) + require.NoError(t, err) - info, err := kv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)) - assert.NoError(t, err) - assert.NotNil(t, info) - - watchInfo, err := parseWatchInfo("fakeKey", []byte(info)) - assert.NoError(t, err) - assert.Equal(t, watchInfo.GetState(), state) - assert.Equal(t, watchInfo.Vchan.GetChannelName(), channelName) - assert.Equal(t, watchInfo.Vchan.GetCollectionID(), collectionID) + if watchInfo.GetState() == expectedState { + assert.Equal(t, watchInfo.Vchan.GetChannelName(), channelName) + assert.Equal(t, watchInfo.Vchan.GetCollectionID(), collectionID) + break + } + } + time.Sleep(100 * time.Millisecond) + } } func getOpsWithWatchInfo(nodeID UniqueID, ch *channel) ChannelOpSet { @@ -88,17 +102,7 @@ func TestChannelManager_StateTransfer(t *testing.T) { channel1 = "channel1" ) - getWatchInfoWithState := func(state datapb.ChannelWatchState) *datapb.ChannelWatchInfo { - return &datapb.ChannelWatchInfo{ - Vchan: &datapb.VchannelInfo{ - CollectionID: collectionID, - ChannelName: channel1, - }, - State: state, - } - } - - t.Run("toWatch-WatchSuccess", func(t *testing.T) { + t.Run("ToWatch-WatchSuccess", func(t *testing.T) { metakv.RemoveWithPrefix("") ctx, cancel := context.WithCancel(context.TODO()) chManager, err := NewChannelManager(metakv, newMockHandler()) @@ -113,16 +117,16 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.AddNode(nodeID) chManager.Watch(&channel{channel1, collectionID}) - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchSuccess)) - require.NoError(t, err) - err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) - require.NoError(t, err) - waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1, collectionID) + cancel() wg.Wait() - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1, collectionID) + _, loaded := chManager.stateTimer.runningTimers.Load(channel1) + assert.False(t, loaded) }) t.Run("ToWatch-WatchFail-ToRelease", func(t *testing.T) { @@ -140,15 +144,17 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.AddNode(nodeID) chManager.Watch(&channel{channel1, collectionID}) - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure)) - require.NoError(t, err) - err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) - require.NoError(t, err) - waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchFailure) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) + cancel() wg.Wait() - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) + + _, loaded := chManager.stateTimer.runningTimers.Load(channel1) + assert.True(t, loaded) + chManager.stateTimer.removeTimers([]string{channel1}) }) t.Run("ToWatch-Timeout", func(t *testing.T) { @@ -176,13 +182,16 @@ func TestChannelManager_StateTransfer(t *testing.T) { chManager.stateTimer.notifyTimeoutWatcher(e) chManager.stateTimer.stopIfExist(e) - waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) cancel() wg.Wait() - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID) + + _, loaded := chManager.stateTimer.runningTimers.Load(channel1) + assert.True(t, loaded) + chManager.stateTimer.removeTimers([]string{channel1}) }) - t.Run("toRelease-ReleaseSuccess-Delete-reassign-ToWatch", func(t *testing.T) { + t.Run("ToRelease-ReleaseSuccess-Reassign-ToWatch-2-DN", func(t *testing.T) { var oldNode = UniqueID(120) metakv.RemoveWithPrefix("") @@ -197,6 +206,48 @@ func TestChannelManager_StateTransfer(t *testing.T) { wg.Done() }() + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{ + {channel1, collectionID}, + }}, + oldNode: {oldNode, []*channel{}}, + }, + } + + err = chManager.Release(nodeID, channel1) + assert.NoError(t, err) + + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID) + + cancel() + wg.Wait() + + w, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) + assert.Error(t, err) + assert.Empty(t, w) + + _, loaded := chManager.stateTimer.runningTimers.Load(channel1) + assert.True(t, loaded) + chManager.stateTimer.removeTimers([]string{channel1}) + }) + + t.Run("ToRelease-ReleaseSuccess-Reassign-ToWatch-1-DN", func(t *testing.T) { + metakv.RemoveWithPrefix("") + ctx, cancel := context.WithCancel(context.TODO()) + chManager, err := NewChannelManager(metakv, newMockHandler()) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + chManager.watchChannelStatesLoop(ctx) + wg.Done() + }() + chManager.store = &ChannelStore{ store: metakv, channelsInfo: map[int64]*NodeChannelInfo{ @@ -208,14 +259,55 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = chManager.Release(nodeID, channel1) assert.NoError(t, err) - chManager.AddNode(oldNode) - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess)) + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess) + + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channel1, collectionID) + + cancel() + wg.Wait() + + _, loaded := chManager.stateTimer.runningTimers.Load(channel1) + assert.True(t, loaded) + chManager.stateTimer.removeTimers([]string{channel1}) + }) + + t.Run("ToRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch-2-DN", func(t *testing.T) { + var oldNode = UniqueID(121) + + metakv.RemoveWithPrefix("") + ctx, cancel := context.WithCancel(context.TODO()) + factory := dependency.NewDefaultFactory(true) + _, err := factory.NewMsgStream(context.TODO()) require.NoError(t, err) - err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) + chManager, err := NewChannelManager(metakv, newMockHandler(), withMsgstreamFactory(factory)) require.NoError(t, err) - waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + chManager.watchChannelStatesLoop(ctx) + wg.Done() + }() + + chManager.store = &ChannelStore{ + store: metakv, + channelsInfo: map[int64]*NodeChannelInfo{ + nodeID: {nodeID, []*channel{ + {channel1, collectionID}, + }}, + oldNode: {oldNode, []*channel{}}, + }, + } + + err = chManager.Release(nodeID, channel1) + assert.NoError(t, err) + + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseFailure) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID) + cancel() wg.Wait() @@ -223,12 +315,12 @@ func TestChannelManager_StateTransfer(t *testing.T) { assert.Error(t, err) assert.Empty(t, w) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID) + _, loaded := chManager.stateTimer.runningTimers.Load(channel1) + assert.True(t, loaded) + chManager.stateTimer.removeTimers([]string{channel1}) }) - t.Run("toRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch", func(t *testing.T) { - var oldNode = UniqueID(121) - + t.Run("ToRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch-1-DN", func(t *testing.T) { metakv.RemoveWithPrefix("") ctx, cancel := context.WithCancel(context.TODO()) factory := dependency.NewDefaultFactory(true) @@ -255,24 +347,19 @@ func TestChannelManager_StateTransfer(t *testing.T) { err = chManager.Release(nodeID, channel1) assert.NoError(t, err) - chManager.AddNode(oldNode) - data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseFailure)) - require.NoError(t, err) - err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) - require.NoError(t, err) + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseFailure) + + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channel1, collectionID) - waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) cancel() wg.Wait() - w, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) - assert.Error(t, err) - assert.Empty(t, w) - - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID) + _, loaded := chManager.stateTimer.runningTimers.Load(channel1) + assert.True(t, loaded) + chManager.stateTimer.removeTimers([]string{channel1}) }) - } func TestChannelManager(t *testing.T) { @@ -283,26 +370,6 @@ func TestChannelManager(t *testing.T) { }() prefix := Params.DataCoordCfg.ChannelWatchSubPath - waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) { - for { - key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName) - v, err := metakv.Load(key) - if err == nil && len(v) > 0 { - watchInfo, err := parseWatchInfo(key, []byte(v)) - require.NoError(t, err) - require.Equal(t, waitState, watchInfo.GetState()) - - watchInfo.State = storeState - data, err := proto.Marshal(watchInfo) - require.NoError(t, err) - - metakv.Save(key, string(data)) - break - } - time.Sleep(100 * time.Millisecond) - } - } - t.Run("test AddNode with avalible node", func(t *testing.T) { // Note: this test is based on the default registerPolicy defer metakv.RemoveWithPrefix("") @@ -335,10 +402,10 @@ func TestChannelManager(t *testing.T) { err = chManager.Watch(&channel{"channel-3", collectionID}) assert.NoError(t, err) - chManager.stateTimer.removeTimers([]string{"channel-3"}) assert.True(t, chManager.Match(nodeToAdd, "channel-3")) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID) + chManager.stateTimer.removeTimers([]string{"channel-3"}) }) t.Run("test AddNode with no available node", func(t *testing.T) { @@ -365,17 +432,20 @@ func TestChannelManager(t *testing.T) { err = chManager.AddNode(nodeID) assert.NoError(t, err) - waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1) - waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel2) + key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) + + key = path.Join(prefix, strconv.FormatInt(nodeID, 10), channel2) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) assert.True(t, chManager.Match(nodeID, channel1)) assert.True(t, chManager.Match(nodeID, channel2)) err = chManager.Watch(&channel{"channel-3", collectionID}) assert.NoError(t, err) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID) - chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel-3", nodeID}) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID) + chManager.stateTimer.removeTimers([]string{"channel-3"}) }) t.Run("test Watch", func(t *testing.T) { @@ -392,14 +462,15 @@ func TestChannelManager(t *testing.T) { err = chManager.Watch(&channel{bufferCh, collectionID}) assert.NoError(t, err) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID) + + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID) chManager.store.Add(nodeID) err = chManager.Watch(&channel{chanToAdd, collectionID}) assert.NoError(t, err) - chManager.stateTimer.removeTimers([]string{chanToAdd}) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID) + chManager.stateTimer.removeTimers([]string{chanToAdd}) }) t.Run("test Release", func(t *testing.T) { @@ -424,9 +495,9 @@ func TestChannelManager(t *testing.T) { err = chManager.Release(nodeID, channelName) assert.NoError(t, err) - chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, channelName, nodeID}) + chManager.stateTimer.removeTimers([]string{channelName}) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) }) t.Run("test Reassign", func(t *testing.T) { @@ -476,7 +547,7 @@ func TestChannelManager(t *testing.T) { chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) // channel is added to remainTest because there's only one node left - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) }) t.Run("test DeleteNode", func(t *testing.T) { @@ -557,7 +628,7 @@ func TestChannelManager(t *testing.T) { chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) // channel is added to remainTest because there's only one node left - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) }) t.Run("test getChannelByNodeAndName", func(t *testing.T) { @@ -640,9 +711,9 @@ func TestChannelManager(t *testing.T) { opSet := getReleaseOp(nodeID, &channel{channelName, collectionID}) chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch) - chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, channelName, nodeID}) + chManager.stateTimer.removeTimers([]string{channelName}) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID) }) } @@ -722,7 +793,7 @@ func TestChannelManager_Reload(t *testing.T) { err = chManager.checkOldNodes([]UniqueID{nodeID}) assert.NoError(t, err) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) chManager.stateTimer.removeTimers([]string{channelName}) }) @@ -744,12 +815,12 @@ func TestChannelManager_Reload(t *testing.T) { err = chManager.checkOldNodes([]UniqueID{nodeID}) assert.NoError(t, err) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, 111, channelName, collectionID) + chManager.stateTimer.removeTimers([]string{channelName}) + v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) assert.Error(t, err) assert.Empty(t, v) - - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 111, channelName, collectionID) - chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, channelName, nodeID}) }) t.Run("ReleaseFail", func(t *testing.T) { @@ -770,13 +841,12 @@ func TestChannelManager_Reload(t *testing.T) { err = chManager.checkOldNodes([]UniqueID{nodeID}) assert.NoError(t, err) - waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(999, 10), channelName)) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, 999, channelName, collectionID) v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)) assert.Error(t, err) assert.Empty(t, v) - checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 999, channelName, collectionID) }) }) @@ -811,12 +881,12 @@ func TestChannelManager_Reload(t *testing.T) { assert.Nil(t, err) assert.Nil(t, cm2.Startup(ctx, []int64{3})) - waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(3, 10), "channel2")) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, 3, "channel1", 1) + waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, 3, "channel2", 1) assert.True(t, cm2.Match(3, "channel1")) assert.True(t, cm2.Match(3, "channel2")) - cm2.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel1", 3}) - cm2.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel2", 3}) + cm2.stateTimer.removeTimers([]string{"channel1", "channel2"}) }) } @@ -860,28 +930,31 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { chManager.AddNode(2) channelBalanced = "channel-1" - waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) { - for { - key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName) - v, err := metakv.Load(key) - if err == nil && len(v) > 0 { - watchInfo, err := parseWatchInfo(key, []byte(v)) - require.NoError(t, err) - require.Equal(t, waitState, watchInfo.GetState()) + // waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) { + // for { + // key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName) + // v, err := metakv.Load(key) + // if err == nil && len(v) > 0 { + // watchInfo, err := parseWatchInfo(key, []byte(v)) + // require.NoError(t, err) + // require.Equal(t, waitState, watchInfo.GetState()) + // + // watchInfo.State = storeState + // data, err := proto.Marshal(watchInfo) + // require.NoError(t, err) + // + // metakv.Save(key, string(data)) + // break + // } + // time.Sleep(100 * time.Millisecond) + // } + // } - watchInfo.State = storeState - data, err := proto.Marshal(watchInfo) - require.NoError(t, err) + key := path.Join(prefix, "1", channelBalanced) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess) - metakv.Save(key, string(data)) - break - } - time.Sleep(100 * time.Millisecond) - } - } - - waitAndStore(datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess, 1, channelBalanced) - waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, channelBalanced) + key = path.Join(prefix, "2", channelBalanced) + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3")) @@ -890,7 +963,8 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { chManager.AddNode(3) chManager.Watch(&channel{"channel-4", collectionID}) - waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 3, "channel-4") + key = path.Join(prefix, "3", "channel-4") + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3")) @@ -898,7 +972,8 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { assert.True(t, chManager.Match(3, "channel-4")) chManager.DeleteNode(3) - waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, "channel-4") + key = path.Join(prefix, "2", "channel-4") + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3")) @@ -906,8 +981,10 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { assert.True(t, chManager.Match(2, "channel-4")) chManager.DeleteNode(2) - waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-4") - waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-1") + key = path.Join(prefix, "1", "channel-4") + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) + key = path.Join(prefix, "1", "channel-1") + waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess) assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3"))