milvus/internal/querycoordv2/observers/task_dispatcher_test.go
congqixia 81caf02554
fix: make qcv2 observer dispatcher execute exactly once (#28472)
See also #28466

In `taskDispatcher.schedule`, same task may be resubmitted if the
previous round did not finish
In this case, TaskObserver.check may set current target by mistake,
which may cause the random search/query failure

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2023-11-16 10:24:19 +08:00

47 lines
892 B
Go

package observers
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type taskDispatcherSuite struct {
suite.Suite
}
func (s *taskDispatcherSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
}
func (s *taskDispatcherSuite) TestMultipleSubmit() {
var wg sync.WaitGroup
wg.Add(6)
set := typeutil.NewConcurrentSet[int64]()
dispatcher := newTaskDispatcher(func(ctx context.Context, key int64) {
defer wg.Done()
set.Insert(key)
time.Sleep(time.Second)
})
dispatcher.Start()
dispatcher.AddTask(1, 2, 3, 4, 5)
dispatcher.AddTask(2, 3, 4, 5, 6)
wg.Wait()
s.ElementsMatch([]int64{1, 2, 3, 4, 5, 6}, set.Collect())
dispatcher.Stop()
}
func TestTaskDispatcher(t *testing.T) {
suite.Run(t, new(taskDispatcherSuite))
}