From 21fc5f5d46409097b587bcc09c55e57c00119d8e Mon Sep 17 00:00:00 2001 From: jaime Date: Fri, 5 Jul 2024 15:48:09 +0800 Subject: [PATCH] enhance: Remove datanode reporting TT based on MQ implementation (#34421) issue: #34420 Signed-off-by: jaime --- configs/milvus.yaml | 1 - internal/datacoord/channel_manager.go | 70 ++-- internal/datacoord/channel_manager_test.go | 4 +- internal/datacoord/channel_store.go | 1 - internal/datacoord/server.go | 102 ------ internal/datacoord/services.go | 15 - internal/datacoord/services_test.go | 305 ------------------ .../compaction/clustering_compactor.go | 2 +- internal/datanode/data_node.go | 8 +- .../datanode/pipeline/data_sync_service.go | 22 +- .../pipeline/flow_graph_time_ticker.go | 146 --------- .../pipeline/flow_graph_write_node.go | 4 +- internal/datanode/pipeline/stats_updater.go | 100 ------ .../datanode/pipeline/stats_updater_test.go | 64 ---- internal/datanode/util/timetick_sender.go | 4 + internal/proxy/search_util.go | 6 +- pkg/util/paramtable/component_param.go | 10 - 17 files changed, 51 insertions(+), 813 deletions(-) delete mode 100644 internal/datanode/pipeline/flow_graph_time_ticker.go delete mode 100644 internal/datanode/pipeline/stats_updater.go delete mode 100644 internal/datanode/pipeline/stats_updater_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0a56071745..3f78c38d94 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -535,7 +535,6 @@ dataNode: checkInterval: 3000 # the interal to check datanode memory usage, in milliseconds forceSyncWatermark: 0.5 # memory watermark for standalone, upon reaching this watermark, segments will be synced. timetick: - byRPC: true interval: 500 channel: # specify the size of global work pool of all channels diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 51317355c8..208324d591 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -59,7 +59,7 @@ type SubCluster interface { CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) } -type ChannelManagerImplV2 struct { +type ChannelManagerImpl struct { cancel context.CancelFunc mu lock.RWMutex wg sync.WaitGroup @@ -82,15 +82,15 @@ type ChannelManagerImplV2 struct { // ChannelBGChecker are goroutining running background type ChannelBGChecker func(ctx context.Context) -// ChannelmanagerOptV2 is to set optional parameters in channel manager. -type ChannelmanagerOptV2 func(c *ChannelManagerImplV2) +// ChannelmanagerOpt is to set optional parameters in channel manager. +type ChannelmanagerOpt func(c *ChannelManagerImpl) -func withFactoryV2(f ChannelPolicyFactory) ChannelmanagerOptV2 { - return func(c *ChannelManagerImplV2) { c.factory = f } +func withFactoryV2(f ChannelPolicyFactory) ChannelmanagerOpt { + return func(c *ChannelManagerImpl) { c.factory = f } } -func withCheckerV2() ChannelmanagerOptV2 { - return func(c *ChannelManagerImplV2) { c.balanceCheckLoop = c.CheckLoop } +func withCheckerV2() ChannelmanagerOpt { + return func(c *ChannelManagerImpl) { c.balanceCheckLoop = c.CheckLoop } } func NewChannelManagerV2( @@ -98,9 +98,9 @@ func NewChannelManagerV2( h Handler, subCluster SubCluster, // sessionManager alloc allocator, - options ...ChannelmanagerOptV2, -) (*ChannelManagerImplV2, error) { - m := &ChannelManagerImplV2{ + options ...ChannelmanagerOpt, +) (*ChannelManagerImpl, error) { + m := &ChannelManagerImpl{ h: h, factory: NewChannelPolicyFactoryV1(), store: NewChannelStoreV2(kv), @@ -121,7 +121,7 @@ func NewChannelManagerV2( return m, nil } -func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNodes []int64) error { +func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes []int64) error { ctx, m.cancel = context.WithCancel(ctx) m.legacyNodes = typeutil.NewUniqueSet(legacyNodes...) @@ -175,14 +175,14 @@ func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNode return nil } -func (m *ChannelManagerImplV2) Close() { +func (m *ChannelManagerImpl) Close() { if m.cancel != nil { m.cancel() m.wg.Wait() } } -func (m *ChannelManagerImplV2) AddNode(nodeID UniqueID) error { +func (m *ChannelManagerImpl) AddNode(nodeID UniqueID) error { m.mu.Lock() defer m.mu.Unlock() @@ -204,7 +204,7 @@ func (m *ChannelManagerImplV2) AddNode(nodeID UniqueID) error { } // Release writes ToRelease channel watch states for a channel -func (m *ChannelManagerImplV2) Release(nodeID UniqueID, channelName string) error { +func (m *ChannelManagerImpl) Release(nodeID UniqueID, channelName string) error { log := log.With( zap.Int64("nodeID", nodeID), zap.String("channel", channelName), @@ -227,7 +227,7 @@ func (m *ChannelManagerImplV2) Release(nodeID UniqueID, channelName string) erro return m.execute(updates) } -func (m *ChannelManagerImplV2) Watch(ctx context.Context, ch RWChannel) error { +func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error { log := log.Ctx(ctx).With(zap.String("channel", ch.GetName())) m.mu.Lock() defer m.mu.Unlock() @@ -256,7 +256,7 @@ func (m *ChannelManagerImplV2) Watch(ctx context.Context, ch RWChannel) error { return nil } -func (m *ChannelManagerImplV2) DeleteNode(nodeID UniqueID) error { +func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error { m.mu.Lock() defer m.mu.Unlock() @@ -288,7 +288,7 @@ func (m *ChannelManagerImplV2) DeleteNode(nodeID UniqueID) error { } // reassign reassigns a channel to another DataNode. -func (m *ChannelManagerImplV2) reassign(original *NodeChannelInfo) error { +func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error { m.mu.Lock() defer m.mu.Unlock() @@ -309,7 +309,7 @@ func (m *ChannelManagerImplV2) reassign(original *NodeChannelInfo) error { return nil } -func (m *ChannelManagerImplV2) Balance() { +func (m *ChannelManagerImpl) Balance() { m.mu.Lock() defer m.mu.Unlock() @@ -325,7 +325,7 @@ func (m *ChannelManagerImplV2) Balance() { } } -func (m *ChannelManagerImplV2) Match(nodeID UniqueID, channel string) bool { +func (m *ChannelManagerImpl) Match(nodeID UniqueID, channel string) bool { m.mu.RLock() defer m.mu.RUnlock() @@ -338,7 +338,7 @@ func (m *ChannelManagerImplV2) Match(nodeID UniqueID, channel string) bool { return ok } -func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWChannel, bool) { +func (m *ChannelManagerImpl) GetChannel(nodeID int64, channelName string) (RWChannel, bool) { m.mu.RLock() defer m.mu.RUnlock() @@ -350,13 +350,13 @@ func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWC return nil, false } -func (m *ChannelManagerImplV2) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { +func (m *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { m.mu.RLock() defer m.mu.RUnlock() return m.store.GetNodeChannelsByCollectionID(collectionID) } -func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []RWChannel { +func (m *ChannelManagerImpl) GetChannelsByCollectionID(collectionID int64) []RWChannel { m.mu.RLock() defer m.mu.RUnlock() channels := []RWChannel{} @@ -370,14 +370,14 @@ func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []R return channels } -func (m *ChannelManagerImplV2) GetChannelNamesByCollectionID(collectionID int64) []string { +func (m *ChannelManagerImpl) GetChannelNamesByCollectionID(collectionID int64) []string { channels := m.GetChannelsByCollectionID(collectionID) return lo.Map(channels, func(ch RWChannel, _ int) string { return ch.GetName() }) } -func (m *ChannelManagerImplV2) FindWatcher(channel string) (UniqueID, error) { +func (m *ChannelManagerImpl) FindWatcher(channel string) (UniqueID, error) { m.mu.RLock() defer m.mu.RUnlock() @@ -400,7 +400,7 @@ func (m *ChannelManagerImplV2) FindWatcher(channel string) (UniqueID, error) { } // unsafe innter func -func (m *ChannelManagerImplV2) removeChannel(nodeID int64, ch RWChannel) error { +func (m *ChannelManagerImpl) removeChannel(nodeID int64, ch RWChannel) error { op := NewChannelOpSet(NewChannelOp(nodeID, Delete, ch)) log.Info("remove channel assignment", zap.String("channel", ch.GetName()), @@ -409,7 +409,7 @@ func (m *ChannelManagerImplV2) removeChannel(nodeID int64, ch RWChannel) error { return m.store.Update(op) } -func (m *ChannelManagerImplV2) CheckLoop(ctx context.Context) { +func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) { balanceTicker := time.NewTicker(Params.DataCoordCfg.ChannelBalanceInterval.GetAsDuration(time.Second)) defer balanceTicker.Stop() checkTicker := time.NewTicker(Params.DataCoordCfg.ChannelCheckInterval.GetAsDuration(time.Second)) @@ -430,7 +430,7 @@ func (m *ChannelManagerImplV2) CheckLoop(ctx context.Context) { } } -func (m *ChannelManagerImplV2) AdvanceChannelState(ctx context.Context) { +func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) { m.mu.RLock() standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby)) toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease)) @@ -447,7 +447,7 @@ func (m *ChannelManagerImplV2) AdvanceChannelState(ctx context.Context) { } } -func (m *ChannelManagerImplV2) finishRemoveChannel(nodeID int64, channels ...RWChannel) { +func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWChannel) { m.mu.Lock() defer m.mu.Unlock() for _, ch := range channels { @@ -463,7 +463,7 @@ func (m *ChannelManagerImplV2) finishRemoveChannel(nodeID int64, channels ...RWC } } -func (m *ChannelManagerImplV2) advanceStandbys(_ context.Context, standbys []*NodeChannelInfo) bool { +func (m *ChannelManagerImpl) advanceStandbys(_ context.Context, standbys []*NodeChannelInfo) bool { var advanced bool = false for _, nodeAssign := range standbys { validChannels := make(map[string]RWChannel) @@ -500,7 +500,7 @@ func (m *ChannelManagerImplV2) advanceStandbys(_ context.Context, standbys []*No return advanced } -func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies []*NodeChannelInfo) bool { +func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies []*NodeChannelInfo) bool { var advanced bool = false for _, nodeAssign := range toNotifies { channelCount := len(nodeAssign.Channels) @@ -563,7 +563,7 @@ type poolResult struct { ch RWChannel } -func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool { +func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool { var advanced bool = false for _, nodeAssign := range toChecks { if len(nodeAssign.Channels) == 0 { @@ -615,7 +615,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []* return advanced } -func (m *ChannelManagerImplV2) Notify(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) error { +func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) error { log := log.With( zap.String("channel", info.GetVchan().GetChannelName()), zap.Int64("assignment", nodeID), @@ -631,7 +631,7 @@ func (m *ChannelManagerImplV2) Notify(ctx context.Context, nodeID int64, info *d return nil } -func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (successful bool, got bool) { +func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (successful bool, got bool) { log := log.With( zap.Int64("opID", info.GetOpID()), zap.Int64("nodeID", nodeID), @@ -674,7 +674,7 @@ func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *da return false, false } -func (m *ChannelManagerImplV2) execute(updates *ChannelOpSet) error { +func (m *ChannelManagerImpl) execute(updates *ChannelOpSet) error { for _, op := range updates.ops { if op.Type != Delete { if err := m.fillChannelWatchInfo(op); err != nil { @@ -688,7 +688,7 @@ func (m *ChannelManagerImplV2) execute(updates *ChannelOpSet) error { } // fillChannelWatchInfoWithState updates the channel op by filling in channel watch info. -func (m *ChannelManagerImplV2) fillChannelWatchInfo(op *ChannelOp) error { +func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error { startTs := time.Now().Unix() for _, ch := range op.Channels { vcInfo := m.h.GetDataVChanPositions(ch, allPartitionID) diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 4bce9cf93f..1ae3443a5e 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -66,7 +66,7 @@ func (s *ChannelManagerSuite) prepareMeta(chNodes map[string]int64, state datapb s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once() } -func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImplV2, nodeID int64, channel string, state ChannelState) { +func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImpl, nodeID int64, channel string, state ChannelState) { rwChannel, found := m.GetChannel(nodeID, channel) s.True(found) s.NotNil(rwChannel) @@ -84,7 +84,7 @@ func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImplV2, nodeID in } } -func (s *ChannelManagerSuite) checkNoAssignment(m *ChannelManagerImplV2, nodeID int64, channel string) { +func (s *ChannelManagerSuite) checkNoAssignment(m *ChannelManagerImpl, nodeID int64, channel string) { rwChannel, found := m.GetChannel(nodeID, channel) s.False(found) s.Nil(rwChannel) diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 3de73b8a03..c3b641ec0f 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -57,7 +57,6 @@ type ROChannelStore interface { // GetNodeChannels for given collection GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string - // GetNodeChannelsBy used by channel_store_v2 and channel_manager_v2 only GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 5e8febb5db..9e2a65d016 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -49,8 +49,6 @@ import ( "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/mq/common" - "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/expr" "github.com/milvus-io/milvus/pkg/util/logutil" @@ -58,7 +56,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" - "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -103,7 +100,6 @@ type Server struct { serverLoopWg sync.WaitGroup quitCh chan struct{} stateCode atomic.Value - helper ServerHelper etcdCli *clientv3.Client tikvCli *txnkv.Client @@ -166,17 +162,6 @@ type CollectionNameInfo struct { DBName string } -// ServerHelper datacoord server injection helper -type ServerHelper struct { - eventAfterHandleDataNodeTt func() -} - -func defaultServerHelper() ServerHelper { - return ServerHelper{ - eventAfterHandleDataNodeTt: func() {}, - } -} - // Option utility function signature to set DataCoord server attributes type Option func(svr *Server) @@ -187,13 +172,6 @@ func WithRootCoordCreator(creator rootCoordCreatorFunc) Option { } } -// WithServerHelper returns an `Option` setting ServerHelp with provided parameter -func WithServerHelper(helper ServerHelper) Option { - return func(svr *Server) { - svr.helper = helper - } -} - // WithCluster returns an `Option` setting Cluster with provided parameter func WithCluster(cluster Cluster) Option { return func(svr *Server) { @@ -228,7 +206,6 @@ func CreateServer(ctx context.Context, factory dependency.Factory, opts ...Optio dataNodeCreator: defaultDataNodeCreatorFunc, indexNodeCreator: defaultIndexNodeCreatorFunc, rootCoordClientCreator: defaultRootCoordCreatorFunc, - helper: defaultServerHelper(), metricsCacheManager: metricsinfo.NewMetricsCacheManager(), enableActiveStandBy: Params.DataCoordCfg.EnableActiveStandby.GetAsBool(), } @@ -697,11 +674,6 @@ func (s *Server) initIndexNodeManager() { } func (s *Server) startServerLoop() { - if !Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { - s.serverLoopWg.Add(1) - s.startDataNodeTtLoop(s.serverLoopCtx) - } - s.serverLoopWg.Add(2) s.startWatchService(s.serverLoopCtx) s.startFlushLoop(s.serverLoopCtx) @@ -712,80 +684,6 @@ func (s *Server) startServerLoop() { s.syncSegmentsScheduler.Start() } -// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream -// tt msg stands for the currently consumed timestamp for each channel -func (s *Server) startDataNodeTtLoop(ctx context.Context) { - ttMsgStream, err := s.factory.NewMsgStream(ctx) - if err != nil { - log.Error("DataCoord failed to create timetick channel", zap.Error(err)) - panic(err) - } - - timeTickChannel := Params.CommonCfg.DataCoordTimeTick.GetValue() - if Params.CommonCfg.PreCreatedTopicEnabled.GetAsBool() { - timeTickChannel = Params.CommonCfg.TimeTicker.GetValue() - } - subName := fmt.Sprintf("%s-%d-datanodeTl", Params.CommonCfg.DataCoordSubName.GetValue(), paramtable.GetNodeID()) - - ttMsgStream.AsConsumer(context.TODO(), []string{timeTickChannel}, subName, common.SubscriptionPositionLatest) - log.Info("DataCoord creates the timetick channel consumer", - zap.String("timeTickChannel", timeTickChannel), - zap.String("subscription", subName)) - - go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream) -} - -func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStream msgstream.MsgStream) { - var checker *timerecord.LongTermChecker - if enableTtChecker { - checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg) - checker.Start() - defer checker.Stop() - } - - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - defer func() { - // https://github.com/milvus-io/milvus/issues/15659 - // msgstream service closed before datacoord quits - defer func() { - if x := recover(); x != nil { - log.Error("Failed to close ttMessage", zap.Any("recovered", x)) - } - }() - ttMsgStream.Close() - }() - for { - select { - case <-ctx.Done(): - log.Info("DataNode timetick loop shutdown") - return - case msgPack, ok := <-ttMsgStream.Chan(): - if !ok || msgPack == nil || len(msgPack.Msgs) == 0 { - log.Info("receive nil timetick msg and shutdown timetick channel") - return - } - - for _, msg := range msgPack.Msgs { - ttMsg, ok := msg.(*msgstream.DataNodeTtMsg) - if !ok { - log.Warn("receive unexpected msg type from tt channel") - continue - } - if enableTtChecker { - checker.Check() - } - - if err := s.handleDataNodeTtMsg(ctx, &ttMsg.DataNodeTtMsg); err != nil { - log.Warn("failed to handle timetick message", zap.Error(err)) - continue - } - } - s.helper.eventAfterHandleDataNodeTt() - } - } -} - func (s *Server) updateSegmentStatistics(stats []*commonpb.SegmentStats) { for _, stat := range stats { segment := s.meta.GetSegment(stat.GetSegmentID()) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 584d6c0e3e..8437a8bfd9 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1468,21 +1468,6 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT return nil } -// getDiff returns the difference of base and remove. i.e. all items that are in `base` but not in `remove`. -func getDiff(base, remove []int64) []int64 { - mb := make(map[int64]struct{}, len(remove)) - for _, x := range remove { - mb[x] = struct{}{} - } - var diff []int64 - for _, x := range base { - if _, found := mb[x]; !found { - diff = append(diff, x) - } - } - return diff -} - // MarkSegmentsDropped marks the given segments as `Dropped`. // An error status will be returned and error will be logged, if we failed to mark *all* segments. // Deprecated, do not use it diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 60b753e48a..c71ba24e4c 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -2,7 +2,6 @@ package datacoord import ( "context" - "fmt" "testing" "time" @@ -32,7 +31,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/tsoutil" ) type ServerSuite struct { @@ -90,309 +88,6 @@ func genMsg(msgType commonpb.MsgType, ch string, t Timestamp, sourceID int64) *m } } -func (s *ServerSuite) TestHandleDataNodeTtMsg() { - var ( - chanName = "ch-1" - collID int64 = 100 - sourceID int64 = 1 - ) - s.testServer.meta.AddCollection(&collectionInfo{ - ID: collID, - Schema: newTestSchema(), - Partitions: []int64{10}, - }) - resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ - NodeID: sourceID, - SegmentIDRequests: []*datapb.SegmentIDRequest{ - { - CollectionID: collID, - PartitionID: 10, - ChannelName: chanName, - Count: 100, - }, - }, - }) - s.Require().NoError(err) - s.Require().True(merr.Ok(resp.GetStatus())) - s.Equal(1, len(resp.GetSegIDAssignments())) - assign := resp.GetSegIDAssignments()[0] - - assignedSegmentID := resp.SegIDAssignments[0].SegID - segment := s.testServer.meta.GetHealthySegment(assignedSegmentID) - s.Require().NotNil(segment) - s.Equal(1, len(segment.allocations)) - - ts := tsoutil.AddPhysicalDurationOnTs(assign.ExpireTime, -3*time.Minute) - msg := genMsg(commonpb.MsgType_DataNodeTt, chanName, ts, sourceID) - msg.SegmentsStats = append(msg.SegmentsStats, &commonpb.SegmentStats{ - SegmentID: assign.GetSegID(), - NumRows: 1, - }) - mockCluster := NewMockCluster(s.T()) - mockCluster.EXPECT().Close().Once() - mockCluster.EXPECT().Flush(mock.Anything, sourceID, chanName, mock.Anything).RunAndReturn( - func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error { - s.EqualValues(chanName, channel) - s.EqualValues(sourceID, nodeID) - s.Equal(1, len(segments)) - s.EqualValues(2, segments[0].GetID()) - - return fmt.Errorf("mock error") - }).Once() - s.testServer.cluster = mockCluster - s.mockChMgr.EXPECT().Match(sourceID, chanName).Return(true).Twice() - - err = s.testServer.handleDataNodeTtMsg(context.TODO(), &msg.DataNodeTtMsg) - s.NoError(err) - - tt := tsoutil.AddPhysicalDurationOnTs(assign.ExpireTime, 48*time.Hour) - msg = genMsg(commonpb.MsgType_DataNodeTt, chanName, tt, sourceID) - msg.SegmentsStats = append(msg.SegmentsStats, &commonpb.SegmentStats{ - SegmentID: assign.GetSegID(), - NumRows: 1, - }) - - err = s.testServer.handleDataNodeTtMsg(context.TODO(), &msg.DataNodeTtMsg) - s.Error(err) -} - -// restart the server for config DataNodeTimeTickByRPC=false -func (s *ServerSuite) initSuiteForTtChannel() { - s.testServer.serverLoopWg.Add(1) - s.testServer.startDataNodeTtLoop(s.testServer.serverLoopCtx) - - s.testServer.meta.AddCollection(&collectionInfo{ - ID: 1, - Schema: newTestSchema(), - Partitions: []int64{10}, - }) -} - -func (s *ServerSuite) TestDataNodeTtChannel_ExpireAfterTt() { - s.initSuiteForTtChannel() - - ctx := context.TODO() - ttMsgStream, err := s.testServer.factory.NewMsgStream(ctx) - s.Require().NoError(err) - - ttMsgStream.AsProducer([]string{paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()}) - defer ttMsgStream.Close() - - var ( - sourceID int64 = 9997 - chanName = "ch-1" - signal = make(chan struct{}) - collID int64 = 1 - ) - mockCluster := NewMockCluster(s.T()) - mockCluster.EXPECT().Close().Once() - mockCluster.EXPECT().Flush(mock.Anything, sourceID, chanName, mock.Anything).RunAndReturn( - func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error { - s.EqualValues(chanName, channel) - s.EqualValues(sourceID, nodeID) - s.Equal(1, len(segments)) - s.EqualValues(2, segments[0].GetID()) - - signal <- struct{}{} - return nil - }).Once() - s.testServer.cluster = mockCluster - s.mockChMgr.EXPECT().Match(sourceID, chanName).Return(true).Once() - - resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ - NodeID: sourceID, - SegmentIDRequests: []*datapb.SegmentIDRequest{ - { - CollectionID: collID, - PartitionID: 10, - ChannelName: chanName, - Count: 100, - }, - }, - }) - s.Require().NoError(err) - s.Require().True(merr.Ok(resp.GetStatus())) - s.Equal(1, len(resp.GetSegIDAssignments())) - - assignedSegmentID := resp.SegIDAssignments[0].SegID - segment := s.testServer.meta.GetHealthySegment(assignedSegmentID) - s.Require().NotNil(segment) - s.Equal(1, len(segment.allocations)) - - msgPack := msgstream.MsgPack{} - tt := tsoutil.AddPhysicalDurationOnTs(resp.SegIDAssignments[0].ExpireTime, 48*time.Hour) - msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", tt, sourceID) - msgPack.Msgs = append(msgPack.Msgs, msg) - err = ttMsgStream.Produce(&msgPack) - s.Require().NoError(err) - - <-signal - segment = s.testServer.meta.GetHealthySegment(assignedSegmentID) - s.NotNil(segment) - s.Equal(0, len(segment.allocations)) -} - -func (s *ServerSuite) TestDataNodeTtChannel_FlushWithDiffChan() { - s.initSuiteForTtChannel() - - ctx := context.TODO() - ttMsgStream, err := s.testServer.factory.NewMsgStream(ctx) - s.Require().NoError(err) - - ttMsgStream.AsProducer([]string{paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()}) - defer ttMsgStream.Close() - - var ( - sourceID int64 = 9998 - chanName = "ch-1" - signal = make(chan struct{}) - collID int64 = 1 - ) - - mockCluster := NewMockCluster(s.T()) - mockCluster.EXPECT().Close().Once() - mockCluster.EXPECT().Flush(mock.Anything, sourceID, chanName, mock.Anything).RunAndReturn( - func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error { - s.EqualValues(chanName, channel) - s.EqualValues(sourceID, nodeID) - s.Equal(1, len(segments)) - - signal <- struct{}{} - return nil - }).Once() - mockCluster.EXPECT().FlushChannels(mock.Anything, sourceID, mock.Anything, []string{chanName}).Return(nil).Once() - s.testServer.cluster = mockCluster - - s.mockChMgr.EXPECT().Match(sourceID, chanName).Return(true).Once() - s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(collID).Return(map[int64][]string{ - sourceID: {chanName}, - }) - - resp, err := s.testServer.AssignSegmentID(ctx, &datapb.AssignSegmentIDRequest{ - NodeID: sourceID, - SegmentIDRequests: []*datapb.SegmentIDRequest{ - { - CollectionID: collID, - PartitionID: 10, - ChannelName: chanName, - Count: 100, - }, - { - CollectionID: collID, - PartitionID: 10, - ChannelName: "ch-2", - Count: 100, - }, - }, - }) - s.Require().NoError(err) - s.Require().True(merr.Ok(resp.GetStatus())) - s.Equal(2, len(resp.GetSegIDAssignments())) - var assign *datapb.SegmentIDAssignment - for _, segment := range resp.SegIDAssignments { - if segment.GetChannelName() == chanName { - assign = segment - break - } - } - s.Require().NotNil(assign) - - resp2, err := s.testServer.Flush(ctx, &datapb.FlushRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Flush, - SourceID: sourceID, - }, - CollectionID: collID, - }) - s.Require().NoError(err) - s.Require().True(merr.Ok(resp2.GetStatus())) - - msgPack := msgstream.MsgPack{} - msg := genMsg(commonpb.MsgType_DataNodeTt, chanName, assign.ExpireTime, sourceID) - msg.SegmentsStats = append(msg.SegmentsStats, &commonpb.SegmentStats{ - SegmentID: assign.GetSegID(), - NumRows: 1, - }) - msgPack.Msgs = append(msgPack.Msgs, msg) - err = ttMsgStream.Produce(&msgPack) - s.NoError(err) - - <-signal -} - -func (s *ServerSuite) TestDataNodeTtChannel_SegmentFlushAfterTt() { - s.initSuiteForTtChannel() - - var ( - sourceID int64 = 9999 - chanName = "ch-1" - signal = make(chan struct{}) - collID int64 = 1 - ) - mockCluster := NewMockCluster(s.T()) - mockCluster.EXPECT().Close().Once() - mockCluster.EXPECT().Flush(mock.Anything, sourceID, chanName, mock.Anything).RunAndReturn( - func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error { - s.EqualValues(chanName, channel) - s.EqualValues(sourceID, nodeID) - s.Equal(1, len(segments)) - - signal <- struct{}{} - return nil - }).Once() - mockCluster.EXPECT().FlushChannels(mock.Anything, sourceID, mock.Anything, []string{chanName}).Return(nil).Once() - s.testServer.cluster = mockCluster - - s.mockChMgr.EXPECT().Match(sourceID, chanName).Return(true).Once() - s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(collID).Return(map[int64][]string{ - sourceID: {chanName}, - }) - - ctx := context.TODO() - ttMsgStream, err := s.testServer.factory.NewMsgStream(ctx) - s.Require().NoError(err) - - ttMsgStream.AsProducer([]string{paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()}) - defer ttMsgStream.Close() - - resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ - SegmentIDRequests: []*datapb.SegmentIDRequest{ - { - CollectionID: 1, - PartitionID: 10, - ChannelName: chanName, - Count: 100, - }, - }, - }) - s.Require().NoError(err) - s.Require().True(merr.Ok(resp.GetStatus())) - s.Require().Equal(1, len(resp.GetSegIDAssignments())) - - assign := resp.GetSegIDAssignments()[0] - - resp2, err := s.testServer.Flush(ctx, &datapb.FlushRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Flush, - }, - CollectionID: 1, - }) - s.Require().NoError(err) - s.Require().True(merr.Ok(resp2.GetStatus())) - - msgPack := msgstream.MsgPack{} - msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", assign.ExpireTime, 9999) - msg.SegmentsStats = append(msg.SegmentsStats, &commonpb.SegmentStats{ - SegmentID: assign.GetSegID(), - NumRows: 1, - }) - msgPack.Msgs = append(msgPack.Msgs, msg) - err = ttMsgStream.Produce(&msgPack) - s.Require().NoError(err) - - <-signal -} - func (s *ServerSuite) TestGetFlushState_ByFlushTs() { s.mockChMgr.EXPECT().GetChannelsByCollectionID(int64(0)). Return([]RWChannel{&channelMeta{Name: "ch1", CollectionID: 0}}).Times(3) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 2a1d7ae7fa..2d7126dd86 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -616,7 +616,7 @@ func (t *clusteringCompactionTask) mappingSegment( log.Warn("stop waiting for memory buffer release as task chan done") return nil default: - //currentSize := t.getCurrentBufferWrittenMemorySize() + // currentSize := t.getCurrentBufferWrittenMemorySize() currentSize := t.getBufferTotalUsedMemorySize() if currentSize < t.getMemoryBufferBlockFlushThreshold() { log.Debug("memory is already below the block watermark, continue writing", diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 4497476b6f..ff71bd27bc 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -326,11 +326,9 @@ func (node *DataNode) Start() error { go node.importScheduler.Start() - if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { - node.timeTickSender = util.NewTimeTickSender(node.broker, node.session.ServerID, - retry.Attempts(20), retry.Sleep(time.Millisecond*100)) - node.timeTickSender.Start() - } + node.timeTickSender = util.NewTimeTickSender(node.broker, node.session.ServerID, + retry.Attempts(20), retry.Sleep(time.Millisecond*100)) + node.timeTickSender.Start() go node.channelCheckpointUpdater.Start() diff --git a/internal/datanode/pipeline/data_sync_service.go b/internal/datanode/pipeline/data_sync_service.go index 3ab4b4923c..8098ddcc5e 100644 --- a/internal/datanode/pipeline/data_sync_service.go +++ b/internal/datanode/pipeline/data_sync_service.go @@ -18,7 +18,6 @@ package pipeline import ( "context" - "fmt" "sync" "go.uber.org/zap" @@ -260,26 +259,7 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, return nil, err } - var updater statsUpdater - if paramtable.Get().DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { - updater = ds.timetickSender - } else { - m, err := config.msFactory.NewMsgStream(ctx) - if err != nil { - return nil, err - } - - m.AsProducer([]string{paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()}) - metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(config.serverID)).Inc() - log.Info("datanode AsProducer", zap.String("TimeTickChannelName", paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue())) - - m.EnableProduce(true) - - updater = newMqStatsUpdater(config, m) - } - - writeNode := newWriteNode(params.Ctx, params.WriteBufferManager, updater, config) - + writeNode := newWriteNode(params.Ctx, params.WriteBufferManager, ds.timetickSender, config) ttNode, err := newTTNode(config, params.WriteBufferManager, params.CheckpointUpdater) if err != nil { return nil, err diff --git a/internal/datanode/pipeline/flow_graph_time_ticker.go b/internal/datanode/pipeline/flow_graph_time_ticker.go deleted file mode 100644 index c793001305..0000000000 --- a/internal/datanode/pipeline/flow_graph_time_ticker.go +++ /dev/null @@ -1,146 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipeline - -import ( - "sync" - "time" - - "github.com/samber/lo" - "go.uber.org/zap" - "golang.org/x/exp/maps" - - "github.com/milvus-io/milvus/internal/datanode/util" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type sendTimeTick func(util.Timestamp, []int64) error - -// mergedTimeTickerSender reduces time ticker sending rate when datanode is doing `fast-forwarding` -// it makes sure time ticker send at most 10 times a second (1tick/100millisecond) -// and the last time tick is always sent -type mergedTimeTickerSender struct { - ts uint64 - segmentIDs map[int64]struct{} - lastSent time.Time - mu sync.Mutex - - cond *sync.Cond // condition to send timeticker - send sendTimeTick // actual sender logic - - wg sync.WaitGroup - closeCh chan struct{} - closeOnce sync.Once -} - -func newUniqueMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender { - return &mergedTimeTickerSender{ - ts: 0, // 0 for not tt send - segmentIDs: make(map[int64]struct{}), - cond: sync.NewCond(&sync.Mutex{}), - send: send, - closeCh: make(chan struct{}), - } -} - -func (mt *mergedTimeTickerSender) bufferTs(ts util.Timestamp, segmentIDs []int64) { - mt.mu.Lock() - defer mt.mu.Unlock() - mt.ts = ts - for _, sid := range segmentIDs { - mt.segmentIDs[sid] = struct{}{} - } -} - -func (mt *mergedTimeTickerSender) tick() { - defer mt.wg.Done() - // this duration might be configuable in the future - t := time.NewTicker(paramtable.Get().DataNodeCfg.DataNodeTimeTickInterval.GetAsDuration(time.Millisecond)) // 500 millisecond - defer t.Stop() - for { - select { - case <-t.C: - mt.cond.L.Lock() - mt.cond.Signal() - mt.cond.L.Unlock() - case <-mt.closeCh: - return - } - } -} - -func (mt *mergedTimeTickerSender) isClosed() bool { - select { - case <-mt.closeCh: - return true - default: - return false - } -} - -func (mt *mergedTimeTickerSender) work() { - defer mt.wg.Done() - lastTs := uint64(0) - for { - var ( - isDiffTs bool - sids []int64 - ) - mt.cond.L.Lock() - if mt.isClosed() { - mt.cond.L.Unlock() - return - } - mt.cond.Wait() - mt.cond.L.Unlock() - - mt.mu.Lock() - isDiffTs = mt.ts != lastTs - if isDiffTs { - for sid := range mt.segmentIDs { - sids = append(sids, sid) - } - // we will reset the timer but not the segmentIDs, since if we sent the timetick fail we may block forever due to flush stuck - lastTs = mt.ts - mt.lastSent = time.Now() - mt.segmentIDs = make(map[int64]struct{}) - } - mt.mu.Unlock() - - if isDiffTs { - if err := mt.send(lastTs, sids); err != nil { - log.Error("send hard time tick failed", zap.Error(err)) - mt.mu.Lock() - maps.Copy(mt.segmentIDs, lo.SliceToMap(sids, func(t int64) (int64, struct{}) { - return t, struct{}{} - })) - mt.mu.Unlock() - } - } - } -} - -func (mt *mergedTimeTickerSender) close() { - mt.closeOnce.Do(func() { - mt.cond.L.Lock() - close(mt.closeCh) - mt.cond.Broadcast() - mt.cond.L.Unlock() - mt.wg.Wait() - }) -} diff --git a/internal/datanode/pipeline/flow_graph_write_node.go b/internal/datanode/pipeline/flow_graph_write_node.go index 504d911ccd..3626df2b90 100644 --- a/internal/datanode/pipeline/flow_graph_write_node.go +++ b/internal/datanode/pipeline/flow_graph_write_node.go @@ -24,7 +24,7 @@ type writeNode struct { channelName string wbManager writebuffer.BufferManager - updater statsUpdater + updater util.StatsUpdater metacache metacache.MetaCache } @@ -122,7 +122,7 @@ func (wNode *writeNode) Operate(in []Msg) []Msg { func newWriteNode( _ context.Context, writeBufferManager writebuffer.BufferManager, - updater statsUpdater, + updater util.StatsUpdater, config *nodeConfig, ) *writeNode { baseNode := BaseNode{} diff --git a/internal/datanode/pipeline/stats_updater.go b/internal/datanode/pipeline/stats_updater.go deleted file mode 100644 index 264cfbcfa7..0000000000 --- a/internal/datanode/pipeline/stats_updater.go +++ /dev/null @@ -1,100 +0,0 @@ -package pipeline - -import ( - "fmt" - "sync" - - "github.com/samber/lo" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/datanode/util" - "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/mq/msgstream" - "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/funcutil" - "github.com/milvus-io/milvus/pkg/util/tsoutil" -) - -type statsUpdater interface { - Update(channel string, ts util.Timestamp, stats []*commonpb.SegmentStats) -} - -// mqStatsUpdater is the wrapper of mergedTimeTickSender -type mqStatsUpdater struct { - sender *mergedTimeTickerSender - producer msgstream.MsgStream - config *nodeConfig - - mut sync.Mutex - stats map[int64]int64 // segment id => row nums -} - -func newMqStatsUpdater(config *nodeConfig, producer msgstream.MsgStream) statsUpdater { - updater := &mqStatsUpdater{ - stats: make(map[int64]int64), - producer: producer, - config: config, - } - sender := newUniqueMergedTimeTickerSender(updater.send) - updater.sender = sender - return updater -} - -func (u *mqStatsUpdater) send(ts util.Timestamp, segmentIDs []int64) error { - u.mut.Lock() - defer u.mut.Unlock() - stats := lo.Map(segmentIDs, func(id int64, _ int) *commonpb.SegmentStats { - rowNum := u.stats[id] - return &commonpb.SegmentStats{ - SegmentID: id, - NumRows: rowNum, - } - }) - - msgPack := msgstream.MsgPack{} - timeTickMsg := msgstream.DataNodeTtMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: ts, - EndTimestamp: ts, - HashValues: []uint32{0}, - }, - DataNodeTtMsg: msgpb.DataNodeTtMsg{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt), - commonpbutil.WithTimeStamp(ts), - commonpbutil.WithSourceID(u.config.serverID), - ), - ChannelName: u.config.vChannelName, - Timestamp: ts, - SegmentsStats: stats, - }, - } - msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) - sub := tsoutil.SubByNow(ts) - pChan := funcutil.ToPhysicalChannel(u.config.vChannelName) - metrics.DataNodeProduceTimeTickLag. - WithLabelValues(fmt.Sprint(u.config.serverID), fmt.Sprint(u.config.collectionID), pChan). - Set(float64(sub)) - err := u.producer.Produce(&msgPack) - if err != nil { - return err - } - - for _, segmentID := range segmentIDs { - delete(u.stats, segmentID) - } - return nil -} - -func (u *mqStatsUpdater) Update(channel string, ts util.Timestamp, stats []*commonpb.SegmentStats) { - u.mut.Lock() - defer u.mut.Unlock() - segmentIDs := lo.Map(stats, func(stats *commonpb.SegmentStats, _ int) int64 { return stats.SegmentID }) - - lo.ForEach(stats, func(stats *commonpb.SegmentStats, _ int) { - u.stats[stats.SegmentID] = stats.NumRows - }) - - u.sender.bufferTs(ts, segmentIDs) -} diff --git a/internal/datanode/pipeline/stats_updater_test.go b/internal/datanode/pipeline/stats_updater_test.go deleted file mode 100644 index 952ac0c519..0000000000 --- a/internal/datanode/pipeline/stats_updater_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package pipeline - -import ( - "testing" - - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus/pkg/mq/msgstream" - "github.com/milvus-io/milvus/pkg/util/tsoutil" -) - -type MqStatsUpdaterSuite struct { - suite.Suite - - producer *msgstream.MockMsgStream - updater *mqStatsUpdater -} - -func (s *MqStatsUpdaterSuite) SetupTest() { - s.producer = msgstream.NewMockMsgStream(s.T()) - s.updater = &mqStatsUpdater{ - stats: make(map[int64]int64), - producer: s.producer, - config: &nodeConfig{ - vChannelName: "by-dev-rootcoord-dml_0v0", - }, - } -} - -func (s *MqStatsUpdaterSuite) TestSend() { - s.Run("send_ok", func() { - s.producer.EXPECT().Produce(mock.Anything).Return(nil) - - s.updater.mut.Lock() - s.updater.stats[100] = 1024 - s.updater.mut.Unlock() - - err := s.updater.send(tsoutil.GetCurrentTime(), []int64{100}) - s.NoError(err) - - s.updater.mut.Lock() - _, has := s.updater.stats[100] - s.updater.mut.Unlock() - s.False(has) - }) - - s.Run("send_error", func() { - s.SetupTest() - s.producer.EXPECT().Produce(mock.Anything).Return(errors.New("mocked")) - - s.updater.mut.Lock() - s.updater.stats[100] = 1024 - s.updater.mut.Unlock() - - err := s.updater.send(tsoutil.GetCurrentTime(), []int64{100}) - s.Error(err) - }) -} - -func TestMqStatsUpdater(t *testing.T) { - suite.Run(t, new(MqStatsUpdaterSuite)) -} diff --git a/internal/datanode/util/timetick_sender.go b/internal/datanode/util/timetick_sender.go index d31e777c25..3bceb52457 100644 --- a/internal/datanode/util/timetick_sender.go +++ b/internal/datanode/util/timetick_sender.go @@ -33,6 +33,10 @@ import ( "github.com/milvus-io/milvus/pkg/util/retry" ) +type StatsUpdater interface { + Update(channel string, ts Timestamp, stats []*commonpb.SegmentStats) +} + // TimeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically // TimeTickSender hold segmentStats cache for each channel, // after send succeeds will clean the cache earlier than last sent timestamp diff --git a/internal/proxy/search_util.go b/internal/proxy/search_util.go index d71d67b48d..382dad91c2 100644 --- a/internal/proxy/search_util.go +++ b/internal/proxy/search_util.go @@ -28,7 +28,7 @@ type rankParams struct { // parseSearchInfo returns QueryInfo and offset func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb.CollectionSchema, ignoreOffset bool) (*planpb.QueryInfo, int64, error) { - //0. parse iterator field + // 0. parse iterator field isIterator, _ := funcutil.GetAttrByKeyFromRepeatedKV(IteratorField, searchParamsPair) // 1. parse offset and real topk @@ -42,8 +42,8 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb } if err := validateLimit(topK); err != nil { if isIterator == "True" { - //1. if the request is from iterator, we set topK to QuotaLimit as the iterator can resolve too large topK problem - //2. GetAsInt64 has cached inside, no need to worry about cpu cost for parsing here + // 1. if the request is from iterator, we set topK to QuotaLimit as the iterator can resolve too large topK problem + // 2. GetAsInt64 has cached inside, no need to worry about cpu cost for parsing here topK = Params.QuotaConfig.TopKLimit.GetAsInt64() } else { return nil, 0, fmt.Errorf("%s [%d] is invalid, %w", TopKKey, topK, err) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 4033443fbe..fce8f78c8f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3712,7 +3712,6 @@ type dataNodeConfig struct { MemoryCheckInterval ParamItem `refreshable:"true"` MemoryForceSyncWatermark ParamItem `refreshable:"true"` - DataNodeTimeTickByRPC ParamItem `refreshable:"false"` // DataNode send timetick interval per collection DataNodeTimeTickInterval ParamItem `refreshable:"false"` @@ -3920,15 +3919,6 @@ func (p *dataNodeConfig) init(base *BaseTable) { } p.FileReadConcurrency.Init(base.mgr) - p.DataNodeTimeTickByRPC = ParamItem{ - Key: "dataNode.timetick.byRPC", - Version: "2.2.9", - PanicIfEmpty: false, - DefaultValue: "true", - Export: true, - } - p.DataNodeTimeTickByRPC.Init(base.mgr) - p.DataNodeTimeTickInterval = ParamItem{ Key: "dataNode.timetick.interval", Version: "2.2.5",