Enhance channel manager ut (#17682)

Resolves: #17676

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2022-06-23 18:08:14 +08:00 committed by GitHub
parent 294f7e4408
commit 21b52bfcf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -35,29 +35,43 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func waitForEctdDataReady(metakv kv.MetaKv, key string) { // waitAndStore simulates DataNode's action
func waitAndStore(t *testing.T, metakv kv.MetaKv, key string, waitState, storeState datapb.ChannelWatchState) {
for { for {
// make sure etcd has finished the operation v, err := metakv.Load(key)
_, err := metakv.Load(key) if err == nil && len(v) > 0 {
if err == nil { watchInfo, err := parseWatchInfo(key, []byte(v))
require.NoError(t, err)
require.Equal(t, waitState, watchInfo.GetState())
watchInfo.State = storeState
data, err := proto.Marshal(watchInfo)
require.NoError(t, err)
metakv.Save(key, string(data))
break break
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
} }
func checkWatchInfoWithState(t *testing.T, kv kv.MetaKv, state datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) { // waitAndCheckState checks if the DataCoord writes expected state into Etcd
prefix := Params.DataCoordCfg.ChannelWatchSubPath func waitAndCheckState(t *testing.T, kv kv.MetaKv, expectedState datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) {
for {
prefix := Params.DataCoordCfg.ChannelWatchSubPath
v, err := kv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName))
if err == nil && len(v) > 0 {
watchInfo, err := parseWatchInfo("fake", []byte(v))
require.NoError(t, err)
info, err := kv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)) if watchInfo.GetState() == expectedState {
assert.NoError(t, err) assert.Equal(t, watchInfo.Vchan.GetChannelName(), channelName)
assert.NotNil(t, info) assert.Equal(t, watchInfo.Vchan.GetCollectionID(), collectionID)
break
watchInfo, err := parseWatchInfo("fakeKey", []byte(info)) }
assert.NoError(t, err) }
assert.Equal(t, watchInfo.GetState(), state) time.Sleep(100 * time.Millisecond)
assert.Equal(t, watchInfo.Vchan.GetChannelName(), channelName) }
assert.Equal(t, watchInfo.Vchan.GetCollectionID(), collectionID)
} }
func getOpsWithWatchInfo(nodeID UniqueID, ch *channel) ChannelOpSet { func getOpsWithWatchInfo(nodeID UniqueID, ch *channel) ChannelOpSet {
@ -88,17 +102,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
channel1 = "channel1" channel1 = "channel1"
) )
getWatchInfoWithState := func(state datapb.ChannelWatchState) *datapb.ChannelWatchInfo { t.Run("ToWatch-WatchSuccess", func(t *testing.T) {
return &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel1,
},
State: state,
}
}
t.Run("toWatch-WatchSuccess", func(t *testing.T) {
metakv.RemoveWithPrefix("") metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler()) chManager, err := NewChannelManager(metakv, newMockHandler())
@ -113,16 +117,16 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.AddNode(nodeID) chManager.AddNode(nodeID)
chManager.Watch(&channel{channel1, collectionID}) chManager.Watch(&channel{channel1, collectionID})
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchSuccess))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1, collectionID)
cancel() cancel()
wg.Wait() wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1, collectionID) _, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.False(t, loaded)
}) })
t.Run("ToWatch-WatchFail-ToRelease", func(t *testing.T) { t.Run("ToWatch-WatchFail-ToRelease", func(t *testing.T) {
@ -140,15 +144,17 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.AddNode(nodeID) chManager.AddNode(nodeID)
chManager.Watch(&channel{channel1, collectionID}) chManager.Watch(&channel{channel1, collectionID})
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchFailure)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
cancel() cancel()
wg.Wait() wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
}) })
t.Run("ToWatch-Timeout", func(t *testing.T) { t.Run("ToWatch-Timeout", func(t *testing.T) {
@ -176,13 +182,16 @@ func TestChannelManager_StateTransfer(t *testing.T) {
chManager.stateTimer.notifyTimeoutWatcher(e) chManager.stateTimer.notifyTimeoutWatcher(e)
chManager.stateTimer.stopIfExist(e) chManager.stateTimer.stopIfExist(e)
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
cancel() cancel()
wg.Wait() wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
}) })
t.Run("toRelease-ReleaseSuccess-Delete-reassign-ToWatch", func(t *testing.T) { t.Run("ToRelease-ReleaseSuccess-Reassign-ToWatch-2-DN", func(t *testing.T) {
var oldNode = UniqueID(120) var oldNode = UniqueID(120)
metakv.RemoveWithPrefix("") metakv.RemoveWithPrefix("")
@ -197,6 +206,48 @@ func TestChannelManager_StateTransfer(t *testing.T) {
wg.Done() wg.Done()
}() }()
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
}},
oldNode: {oldNode, []*channel{}},
},
}
err = chManager.Release(nodeID, channel1)
assert.NoError(t, err)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID)
cancel()
wg.Wait()
w, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10)))
assert.Error(t, err)
assert.Empty(t, w)
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
})
t.Run("ToRelease-ReleaseSuccess-Reassign-ToWatch-1-DN", func(t *testing.T) {
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
wg.Done()
}()
chManager.store = &ChannelStore{ chManager.store = &ChannelStore{
store: metakv, store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{ channelsInfo: map[int64]*NodeChannelInfo{
@ -208,14 +259,55 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = chManager.Release(nodeID, channel1) err = chManager.Release(nodeID, channel1)
assert.NoError(t, err) assert.NoError(t, err)
chManager.AddNode(oldNode)
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess)) key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channel1, collectionID)
cancel()
wg.Wait()
_, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
})
t.Run("ToRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch-2-DN", func(t *testing.T) {
var oldNode = UniqueID(121)
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
factory := dependency.NewDefaultFactory(true)
_, err := factory.NewMsgStream(context.TODO())
require.NoError(t, err) require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data)) chManager, err := NewChannelManager(metakv, newMockHandler(), withMsgstreamFactory(factory))
require.NoError(t, err) require.NoError(t, err)
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1)) wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
wg.Done()
}()
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
}},
oldNode: {oldNode, []*channel{}},
},
}
err = chManager.Release(nodeID, channel1)
assert.NoError(t, err)
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseFailure)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID)
cancel() cancel()
wg.Wait() wg.Wait()
@ -223,12 +315,12 @@ func TestChannelManager_StateTransfer(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
assert.Empty(t, w) assert.Empty(t, w)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID) _, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.True(t, loaded)
chManager.stateTimer.removeTimers([]string{channel1})
}) })
t.Run("toRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch", func(t *testing.T) { t.Run("ToRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch-1-DN", func(t *testing.T) {
var oldNode = UniqueID(121)
metakv.RemoveWithPrefix("") metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
factory := dependency.NewDefaultFactory(true) factory := dependency.NewDefaultFactory(true)
@ -255,24 +347,19 @@ func TestChannelManager_StateTransfer(t *testing.T) {
err = chManager.Release(nodeID, channel1) err = chManager.Release(nodeID, channel1)
assert.NoError(t, err) assert.NoError(t, err)
chManager.AddNode(oldNode)
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseFailure)) key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
require.NoError(t, err) waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseFailure)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channel1, collectionID)
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(oldNode, 10), channel1))
cancel() cancel()
wg.Wait() wg.Wait()
w, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) _, loaded := chManager.stateTimer.runningTimers.Load(channel1)
assert.Error(t, err) assert.True(t, loaded)
assert.Empty(t, w) chManager.stateTimer.removeTimers([]string{channel1})
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID)
}) })
} }
func TestChannelManager(t *testing.T) { func TestChannelManager(t *testing.T) {
@ -283,26 +370,6 @@ func TestChannelManager(t *testing.T) {
}() }()
prefix := Params.DataCoordCfg.ChannelWatchSubPath prefix := Params.DataCoordCfg.ChannelWatchSubPath
waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) {
for {
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)
v, err := metakv.Load(key)
if err == nil && len(v) > 0 {
watchInfo, err := parseWatchInfo(key, []byte(v))
require.NoError(t, err)
require.Equal(t, waitState, watchInfo.GetState())
watchInfo.State = storeState
data, err := proto.Marshal(watchInfo)
require.NoError(t, err)
metakv.Save(key, string(data))
break
}
time.Sleep(100 * time.Millisecond)
}
}
t.Run("test AddNode with avalible node", func(t *testing.T) { t.Run("test AddNode with avalible node", func(t *testing.T) {
// Note: this test is based on the default registerPolicy // Note: this test is based on the default registerPolicy
defer metakv.RemoveWithPrefix("") defer metakv.RemoveWithPrefix("")
@ -335,10 +402,10 @@ func TestChannelManager(t *testing.T) {
err = chManager.Watch(&channel{"channel-3", collectionID}) err = chManager.Watch(&channel{"channel-3", collectionID})
assert.NoError(t, err) assert.NoError(t, err)
chManager.stateTimer.removeTimers([]string{"channel-3"})
assert.True(t, chManager.Match(nodeToAdd, "channel-3")) assert.True(t, chManager.Match(nodeToAdd, "channel-3"))
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID)
chManager.stateTimer.removeTimers([]string{"channel-3"})
}) })
t.Run("test AddNode with no available node", func(t *testing.T) { t.Run("test AddNode with no available node", func(t *testing.T) {
@ -365,17 +432,20 @@ func TestChannelManager(t *testing.T) {
err = chManager.AddNode(nodeID) err = chManager.AddNode(nodeID)
assert.NoError(t, err) assert.NoError(t, err)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1) key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, nodeID, channel2) waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
key = path.Join(prefix, strconv.FormatInt(nodeID, 10), channel2)
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
assert.True(t, chManager.Match(nodeID, channel1)) assert.True(t, chManager.Match(nodeID, channel1))
assert.True(t, chManager.Match(nodeID, channel2)) assert.True(t, chManager.Match(nodeID, channel2))
err = chManager.Watch(&channel{"channel-3", collectionID}) err = chManager.Watch(&channel{"channel-3", collectionID})
assert.NoError(t, err) assert.NoError(t, err)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID)
chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel-3", nodeID})
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID)
chManager.stateTimer.removeTimers([]string{"channel-3"})
}) })
t.Run("test Watch", func(t *testing.T) { t.Run("test Watch", func(t *testing.T) {
@ -392,14 +462,15 @@ func TestChannelManager(t *testing.T) {
err = chManager.Watch(&channel{bufferCh, collectionID}) err = chManager.Watch(&channel{bufferCh, collectionID})
assert.NoError(t, err) assert.NoError(t, err)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID)
chManager.store.Add(nodeID) chManager.store.Add(nodeID)
err = chManager.Watch(&channel{chanToAdd, collectionID}) err = chManager.Watch(&channel{chanToAdd, collectionID})
assert.NoError(t, err) assert.NoError(t, err)
chManager.stateTimer.removeTimers([]string{chanToAdd}) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID) chManager.stateTimer.removeTimers([]string{chanToAdd})
}) })
t.Run("test Release", func(t *testing.T) { t.Run("test Release", func(t *testing.T) {
@ -424,9 +495,9 @@ func TestChannelManager(t *testing.T) {
err = chManager.Release(nodeID, channelName) err = chManager.Release(nodeID, channelName)
assert.NoError(t, err) assert.NoError(t, err)
chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, channelName, nodeID}) chManager.stateTimer.removeTimers([]string{channelName})
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID)
}) })
t.Run("test Reassign", func(t *testing.T) { t.Run("test Reassign", func(t *testing.T) {
@ -476,7 +547,7 @@ func TestChannelManager(t *testing.T) {
chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
// channel is added to remainTest because there's only one node left // channel is added to remainTest because there's only one node left
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
}) })
t.Run("test DeleteNode", func(t *testing.T) { t.Run("test DeleteNode", func(t *testing.T) {
@ -557,7 +628,7 @@ func TestChannelManager(t *testing.T) {
chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID}) chManager.stateTimer.stopIfExist(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
// channel is added to remainTest because there's only one node left // channel is added to remainTest because there's only one node left
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, remainTest.nodeID, remainTest.chName, collectionID)
}) })
t.Run("test getChannelByNodeAndName", func(t *testing.T) { t.Run("test getChannelByNodeAndName", func(t *testing.T) {
@ -640,9 +711,9 @@ func TestChannelManager(t *testing.T) {
opSet := getReleaseOp(nodeID, &channel{channelName, collectionID}) opSet := getReleaseOp(nodeID, &channel{channelName, collectionID})
chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch) chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch)
chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, channelName, nodeID}) chManager.stateTimer.removeTimers([]string{channelName})
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID)
}) })
} }
@ -722,7 +793,7 @@ func TestChannelManager_Reload(t *testing.T) {
err = chManager.checkOldNodes([]UniqueID{nodeID}) err = chManager.checkOldNodes([]UniqueID{nodeID})
assert.NoError(t, err) assert.NoError(t, err)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID)
chManager.stateTimer.removeTimers([]string{channelName}) chManager.stateTimer.removeTimers([]string{channelName})
}) })
@ -744,12 +815,12 @@ func TestChannelManager_Reload(t *testing.T) {
err = chManager.checkOldNodes([]UniqueID{nodeID}) err = chManager.checkOldNodes([]UniqueID{nodeID})
assert.NoError(t, err) assert.NoError(t, err)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, 111, channelName, collectionID)
chManager.stateTimer.removeTimers([]string{channelName})
v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10))) v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10)))
assert.Error(t, err) assert.Error(t, err)
assert.Empty(t, v) assert.Empty(t, v)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 111, channelName, collectionID)
chManager.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, channelName, nodeID})
}) })
t.Run("ReleaseFail", func(t *testing.T) { t.Run("ReleaseFail", func(t *testing.T) {
@ -770,13 +841,12 @@ func TestChannelManager_Reload(t *testing.T) {
err = chManager.checkOldNodes([]UniqueID{nodeID}) err = chManager.checkOldNodes([]UniqueID{nodeID})
assert.NoError(t, err) assert.NoError(t, err)
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(999, 10), channelName)) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, 999, channelName, collectionID)
v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)) v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName))
assert.Error(t, err) assert.Error(t, err)
assert.Empty(t, v) assert.Empty(t, v)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 999, channelName, collectionID)
}) })
}) })
@ -811,12 +881,12 @@ func TestChannelManager_Reload(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Nil(t, cm2.Startup(ctx, []int64{3})) assert.Nil(t, cm2.Startup(ctx, []int64{3}))
waitForEctdDataReady(metakv, path.Join(prefix, strconv.FormatInt(3, 10), "channel2")) waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, 3, "channel1", 1)
waitAndCheckState(t, metakv, datapb.ChannelWatchState_ToWatch, 3, "channel2", 1)
assert.True(t, cm2.Match(3, "channel1")) assert.True(t, cm2.Match(3, "channel1"))
assert.True(t, cm2.Match(3, "channel2")) assert.True(t, cm2.Match(3, "channel2"))
cm2.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel1", 3}) cm2.stateTimer.removeTimers([]string{"channel1", "channel2"})
cm2.stateTimer.stopIfExist(&ackEvent{watchSuccessAck, "channel2", 3})
}) })
} }
@ -860,28 +930,31 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
chManager.AddNode(2) chManager.AddNode(2)
channelBalanced = "channel-1" channelBalanced = "channel-1"
waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) { // waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) {
for { // for {
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName) // key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName)
v, err := metakv.Load(key) // v, err := metakv.Load(key)
if err == nil && len(v) > 0 { // if err == nil && len(v) > 0 {
watchInfo, err := parseWatchInfo(key, []byte(v)) // watchInfo, err := parseWatchInfo(key, []byte(v))
require.NoError(t, err) // require.NoError(t, err)
require.Equal(t, waitState, watchInfo.GetState()) // require.Equal(t, waitState, watchInfo.GetState())
//
// watchInfo.State = storeState
// data, err := proto.Marshal(watchInfo)
// require.NoError(t, err)
//
// metakv.Save(key, string(data))
// break
// }
// time.Sleep(100 * time.Millisecond)
// }
// }
watchInfo.State = storeState key := path.Join(prefix, "1", channelBalanced)
data, err := proto.Marshal(watchInfo) waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
require.NoError(t, err)
metakv.Save(key, string(data)) key = path.Join(prefix, "2", channelBalanced)
break waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
}
time.Sleep(100 * time.Millisecond)
}
}
waitAndStore(datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess, 1, channelBalanced)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, channelBalanced)
assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3")) assert.True(t, chManager.Match(1, "channel-3"))
@ -890,7 +963,8 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
chManager.AddNode(3) chManager.AddNode(3)
chManager.Watch(&channel{"channel-4", collectionID}) chManager.Watch(&channel{"channel-4", collectionID})
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 3, "channel-4") key = path.Join(prefix, "3", "channel-4")
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3")) assert.True(t, chManager.Match(1, "channel-3"))
@ -898,7 +972,8 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
assert.True(t, chManager.Match(3, "channel-4")) assert.True(t, chManager.Match(3, "channel-4"))
chManager.DeleteNode(3) chManager.DeleteNode(3)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 2, "channel-4") key = path.Join(prefix, "2", "channel-4")
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3")) assert.True(t, chManager.Match(1, "channel-3"))
@ -906,8 +981,10 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
assert.True(t, chManager.Match(2, "channel-4")) assert.True(t, chManager.Match(2, "channel-4"))
chManager.DeleteNode(2) chManager.DeleteNode(2)
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-4") key = path.Join(prefix, "1", "channel-4")
waitAndStore(datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess, 1, "channel-1") waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
key = path.Join(prefix, "1", "channel-1")
waitAndStore(t, metakv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-2"))
assert.True(t, chManager.Match(1, "channel-3")) assert.True(t, chManager.Match(1, "channel-3"))