fix: Fix unstable ut of msgdispatcher (#34485) (#34486)

issue: https://github.com/milvus-io/milvus/issues/34484

pr: https://github.com/milvus-io/milvus/pull/34485

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-07-09 10:06:24 +08:00 committed by GitHub
parent 7034260721
commit 42b38fba00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -234,11 +234,9 @@ func (suite *SimulationSuite) produceMsg(wg *sync.WaitGroup) {
func (suite *SimulationSuite) consumeMsg(ctx context.Context, wg *sync.WaitGroup, vchannel string) {
defer wg.Done()
var lastTs typeutil.Timestamp
timeoutCtx, cancel := context.WithTimeout(ctx, 5000*time.Millisecond)
defer cancel()
for {
select {
case <-timeoutCtx.Done():
case <-ctx.Done():
return
case pack := <-suite.vchannels[vchannel].output:
assert.Greater(suite.T(), pack.EndTs, lastTs)
@ -262,7 +260,7 @@ func (suite *SimulationSuite) consumeMsg(ctx context.Context, wg *sync.WaitGroup
func (suite *SimulationSuite) produceTimeTickOnly(ctx context.Context) {
tt := 1
ticker := time.NewTicker(10 * time.Millisecond)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
@ -280,6 +278,9 @@ func (suite *SimulationSuite) produceTimeTickOnly(ctx context.Context) {
}
func (suite *SimulationSuite) TestDispatchToVchannels() {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
const vchannelNum = 10
suite.vchannels = make(map[string]*vchannelHelper, vchannelNum)
for i := 0; i < vchannelNum; i++ {
@ -295,7 +296,7 @@ func (suite *SimulationSuite) TestDispatchToVchannels() {
wg.Wait()
for vchannel := range suite.vchannels {
wg.Add(1)
go suite.consumeMsg(context.Background(), wg, vchannel)
go suite.consumeMsg(ctx, wg, vchannel)
}
wg.Wait()
for _, helper := range suite.vchannels {
@ -332,7 +333,7 @@ func (suite *SimulationSuite) TestMerge() {
suite.Eventually(func() bool {
suite.T().Logf("dispatcherManager.dispatcherNum = %d", suite.manager.Num())
return suite.manager.Num() == 1 // expected all merged, only mainDispatcher exist
}, 10*time.Second, 100*time.Millisecond)
}, 15*time.Second, 100*time.Millisecond)
cancel()
wg.Wait()