mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
enhance: Refine channelCpUpdater field & test (#33083)
Avoid passing datanode around preparing datanode code directory refactory. Also refine unit test code for same component. The `Await` shall return first before checking the counter number since when lock cost is heavy (using deadlock.RWMutex See PR #33069.) case may fail due to long running time submitting tasks. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
b45798107a
commit
892fe66b57
@ -25,6 +25,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/datanode/broker"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -41,7 +42,7 @@ type channelCPUpdateTask struct {
|
||||
}
|
||||
|
||||
type channelCheckpointUpdater struct {
|
||||
dn *DataNode
|
||||
broker broker.Broker
|
||||
|
||||
mu sync.RWMutex
|
||||
tasks map[string]*channelCPUpdateTask
|
||||
@ -51,9 +52,9 @@ type channelCheckpointUpdater struct {
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func newChannelCheckpointUpdater(dn *DataNode) *channelCheckpointUpdater {
|
||||
func newChannelCheckpointUpdater(broker broker.Broker) *channelCheckpointUpdater {
|
||||
return &channelCheckpointUpdater{
|
||||
dn: dn,
|
||||
broker: broker,
|
||||
tasks: make(map[string]*channelCPUpdateTask),
|
||||
closeCh: make(chan struct{}),
|
||||
notifyChan: make(chan struct{}, 1),
|
||||
@ -124,7 +125,7 @@ func (ccu *channelCheckpointUpdater) updateCheckpoints(tasks []*channelCPUpdateT
|
||||
channelCPs := lo.Map(tasks, func(t *channelCPUpdateTask, _ int) *msgpb.MsgPosition {
|
||||
return t.pos
|
||||
})
|
||||
err := ccu.dn.broker.UpdateChannelCheckpoint(ctx, channelCPs)
|
||||
err := ccu.broker.UpdateChannelCheckpoint(ctx, channelCPs)
|
||||
if err != nil {
|
||||
log.Warn("update channel checkpoint failed", zap.Error(err))
|
||||
return
|
||||
|
@ -35,23 +35,23 @@ import (
|
||||
type ChannelCPUpdaterSuite struct {
|
||||
suite.Suite
|
||||
|
||||
broker *broker.MockBroker
|
||||
updater *channelCheckpointUpdater
|
||||
}
|
||||
|
||||
func (s *ChannelCPUpdaterSuite) SetupTest() {
|
||||
s.updater = newChannelCheckpointUpdater(&DataNode{})
|
||||
s.broker = broker.NewMockBroker(s.T())
|
||||
s.updater = newChannelCheckpointUpdater(s.broker)
|
||||
}
|
||||
|
||||
func (s *ChannelCPUpdaterSuite) TestUpdate() {
|
||||
paramtable.Get().Save(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.Key, "0.01")
|
||||
defer paramtable.Get().Save(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.Key, "10")
|
||||
|
||||
b := broker.NewMockBroker(s.T())
|
||||
b.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, positions []*msgpb.MsgPosition) error {
|
||||
s.broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, positions []*msgpb.MsgPosition) error {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
return nil
|
||||
})
|
||||
s.updater.dn.broker = b
|
||||
|
||||
go s.updater.start()
|
||||
defer s.updater.close()
|
||||
@ -75,10 +75,10 @@ func (s *ChannelCPUpdaterSuite) TestUpdate() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
s.Eventually(func() bool {
|
||||
return counter.Load() == int64(tasksNum)
|
||||
}, time.Second*10, time.Millisecond*100)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestChannelCPUpdater(t *testing.T) {
|
||||
|
@ -289,7 +289,7 @@ func (node *DataNode) Init() error {
|
||||
|
||||
node.importTaskMgr = importv2.NewTaskManager()
|
||||
node.importScheduler = importv2.NewScheduler(node.importTaskMgr, node.syncMgr, node.chunkManager)
|
||||
node.channelCheckpointUpdater = newChannelCheckpointUpdater(node)
|
||||
node.channelCheckpointUpdater = newChannelCheckpointUpdater(node.broker)
|
||||
node.flowgraphManager = newFlowgraphManager()
|
||||
|
||||
if paramtable.Get().DataCoordCfg.EnableBalanceChannelWithRPC.GetAsBool() {
|
||||
|
@ -397,7 +397,7 @@ func (s *DataSyncServiceSuite) SetupTest() {
|
||||
},
|
||||
}
|
||||
s.node.ctx = context.Background()
|
||||
s.node.channelCheckpointUpdater = newChannelCheckpointUpdater(s.node)
|
||||
s.node.channelCheckpointUpdater = newChannelCheckpointUpdater(s.node.broker)
|
||||
paramtable.Get().Save(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.Key, "0.01")
|
||||
defer paramtable.Get().Save(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.Key, "10")
|
||||
go s.node.channelCheckpointUpdater.start()
|
||||
|
Loading…
Reference in New Issue
Block a user