diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 5695276472..490b6ce08b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -312,54 +312,54 @@ func (s *Server) initMeta() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) s.serverLoopWg.Add(4) - go s.startStatsChannel(s.serverLoopCtx) - go s.startDataNodeTtLoop(s.serverLoopCtx) - go s.startWatchService(s.serverLoopCtx) - go s.startFlushLoop(s.serverLoopCtx) + s.startStatsChannel(s.serverLoopCtx) + s.startDataNodeTtLoop(s.serverLoopCtx) + s.startWatchService(s.serverLoopCtx) + s.startFlushLoop(s.serverLoopCtx) go s.session.LivenessCheck(s.serverLoopCtx, func() { log.Fatal("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID)) }) } func (s *Server) startStatsChannel(ctx context.Context) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() statsStream, _ := s.msFactory.NewMsgStream(ctx) statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName) log.Debug("dataCoord create stats channel consumer", zap.String("channelName", Params.StatisticsChannelName), zap.String("descriptionName", Params.DataCoordSubscriptionName)) statsStream.Start() - defer statsStream.Close() - for { - select { - case <-ctx.Done(): - log.Debug("stats channel shutdown") - return - default: - } - msgPack := statsStream.Consume() - if msgPack == nil { - log.Debug("receive nil stats msg, shutdown stats channel") - return - } - for _, msg := range msgPack.Msgs { - if msg.Type() != commonpb.MsgType_SegmentStatistics { - log.Warn("receive unknown msg from segment statistics channel", - zap.Stringer("msgType", msg.Type())) - continue + go func() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + defer statsStream.Close() + for { + select { + case <-ctx.Done(): + log.Debug("stats channel shutdown") + return + default: } - ssMsg := msg.(*msgstream.SegmentStatisticsMsg) - for _, stat := range ssMsg.SegStats { - s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows()) + msgPack := statsStream.Consume() + if msgPack == nil { + log.Debug("receive nil stats msg, shutdown stats channel") + return + } + for _, msg := range msgPack.Msgs { + if msg.Type() != commonpb.MsgType_SegmentStatistics { + log.Warn("receive unknown msg from segment statistics channel", + zap.Stringer("msgType", msg.Type())) + continue + } + ssMsg := msg.(*msgstream.SegmentStatisticsMsg) + for _, stat := range ssMsg.SegStats { + s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows()) + } } } - } + }() } func (s *Server) startDataNodeTtLoop(ctx context.Context) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() ttMsgStream, err := s.msFactory.NewMsgStream(ctx) if err != nil { log.Error("new msg stream failed", zap.Error(err)) @@ -371,97 +371,103 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { zap.String("timeTickChannelName", Params.TimeTickChannelName), zap.String("subscriptionName", Params.DataCoordSubscriptionName)) ttMsgStream.Start() - defer ttMsgStream.Close() - var checker *LongTermChecker - if enableTtChecker { - checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg) - checker.Start() - defer checker.Stop() - } - for { - select { - case <-ctx.Done(): - log.Debug("data node tt loop shutdown") - return - default: + go func() { + var checker *LongTermChecker + if enableTtChecker { + checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg) + checker.Start() + defer checker.Stop() } - msgPack := ttMsgStream.Consume() - if msgPack == nil { - log.Debug("receive nil tt msg, shutdown tt channel") - return - } - for _, msg := range msgPack.Msgs { - if msg.Type() != commonpb.MsgType_DataNodeTt { - log.Warn("receive unexpected msg type from tt channel", - zap.Stringer("msgType", msg.Type())) - continue + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + defer ttMsgStream.Close() + for { + select { + case <-ctx.Done(): + log.Debug("data node tt loop shutdown") + return + default: } - ttMsg := msg.(*msgstream.DataNodeTtMsg) - if enableTtChecker { - checker.Check() + msgPack := ttMsgStream.Consume() + if msgPack == nil { + log.Debug("receive nil tt msg, shutdown tt channel") + return } - - ch := ttMsg.ChannelName - ts := ttMsg.Timestamp - if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil { - log.Warn("failed to expire allocations", zap.Error(err)) - continue - } - physical, _ := tsoutil.ParseTS(ts) - if time.Since(physical).Minutes() > 1 { - // if lag behind, log every 1 mins about - log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical)) - } - segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts) - if err != nil { - log.Warn("get flushable segments failed", zap.Error(err)) - continue - } - - staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool { - return !info.lastFlushTime.IsZero() && time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration - }) - - if len(segments)+len(staleSegments) == 0 { - continue - } - log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments))) - segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments)) - for _, id := range segments { - sInfo := s.meta.GetSegment(id) - if sInfo == nil { - log.Error("get segment from meta error", zap.Int64("id", id), - zap.Error(err)) + for _, msg := range msgPack.Msgs { + if msg.Type() != commonpb.MsgType_DataNodeTt { + log.Warn("receive unexpected msg type from tt channel", + zap.Stringer("msgType", msg.Type())) continue } - segmentInfos = append(segmentInfos, sInfo.SegmentInfo) - s.meta.SetLastFlushTime(id, time.Now()) - } - markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments)) - for _, segment := range staleSegments { - for _, fSeg := range segmentInfos { - // check segment needs flush first - if segment.GetID() == fSeg.GetID() { + ttMsg := msg.(*msgstream.DataNodeTtMsg) + if enableTtChecker { + checker.Check() + } + + ch := ttMsg.ChannelName + ts := ttMsg.Timestamp + if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil { + log.Warn("failed to expire allocations", zap.Error(err)) + continue + } + physical, _ := tsoutil.ParseTS(ts) + if time.Since(physical).Minutes() > 1 { + // if lag behind, log every 1 mins about + log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical)) + } + segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts) + if err != nil { + log.Warn("get flushable segments failed", zap.Error(err)) + continue + } + + staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool { + return !info.lastFlushTime.IsZero() && time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration + }) + + if len(segments)+len(staleSegments) == 0 { + continue + } + log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments))) + segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments)) + for _, id := range segments { + sInfo := s.meta.GetSegment(id) + if sInfo == nil { + log.Error("get segment from meta error", zap.Int64("id", id), + zap.Error(err)) continue } + segmentInfos = append(segmentInfos, sInfo.SegmentInfo) + s.meta.SetLastFlushTime(id, time.Now()) + } + markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments)) + for _, segment := range staleSegments { + for _, fSeg := range segmentInfos { + // check segment needs flush first + if segment.GetID() == fSeg.GetID() { + continue + } + } + markSegments = append(markSegments, segment.SegmentInfo) + s.meta.SetLastFlushTime(segment.GetID(), time.Now()) + } + if len(segmentInfos)+len(markSegments) > 0 { + s.cluster.Flush(s.ctx, segmentInfos, markSegments) } - markSegments = append(markSegments, segment.SegmentInfo) - s.meta.SetLastFlushTime(segment.GetID(), time.Now()) - } - if len(segmentInfos)+len(markSegments) > 0 { - s.cluster.Flush(s.ctx, segmentInfos, markSegments) } + s.helper.eventAfterHandleDataNodeTt() } - s.helper.eventAfterHandleDataNodeTt() - } + }() } -//go:norace -// fix datarace in unittest -// startWatchService will only be invoked at start procedure -// otherwise, remove the annotation and add atomic protection +// start a goroutine wto watch services func (s *Server) startWatchService(ctx context.Context) { + go s.watchService(ctx) +} + +// watchService watchs services +func (s *Server) watchService(ctx context.Context) { defer logutil.LogPanic() defer s.serverLoopWg.Done() for { @@ -484,6 +490,7 @@ func (s *Server) startWatchService(ctx context.Context) { } } } + } // handles session events - DataNodes Add/Del @@ -527,22 +534,24 @@ func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.Sess } func (s *Server) startFlushLoop(ctx context.Context) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - ctx2, cancel := context.WithCancel(ctx) - defer cancel() - // send `Flushing` segments - go s.handleFlushingSegments(ctx2) - for { - select { - case <-ctx.Done(): - log.Debug("flush loop shutdown") - return - case segmentID := <-s.flushCh: - //Ignore return error - _ = s.postFlush(ctx, segmentID) + go func() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + ctx2, cancel := context.WithCancel(ctx) + defer cancel() + // send `Flushing` segments + go s.handleFlushingSegments(ctx2) + for { + select { + case <-ctx.Done(): + log.Debug("flush loop shutdown") + return + case segmentID := <-s.flushCh: + //Ignore return error + _ = s.postFlush(ctx, segmentID) + } } - } + }() } // post function after flush is done diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 9cae21e78f..8f29c5f305 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -592,7 +592,7 @@ func TestService_WatchServices(t *testing.T) { signal := make(chan struct{}, 1) go func() { - svr.startWatchService(context.Background()) + svr.watchService(context.Background()) flag = true signal <- struct{}{} }() @@ -609,7 +609,7 @@ func TestService_WatchServices(t *testing.T) { svr.serverLoopWg.Add(1) go func() { - svr.startWatchService(ctx) + svr.watchService(ctx) flag = true signal <- struct{}{} }() diff --git a/internal/util/mqclient/pulsar_client.go b/internal/util/mqclient/pulsar_client.go index e5ae08a61f..2c5d67044f 100644 --- a/internal/util/mqclient/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -67,9 +67,12 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } - //consumer.Seek(pulsar.EarliestMessageID()) - //consumer.SeekByTime(time.Unix(0, 0)) + pConsumer := &pulsarConsumer{c: consumer, closeCh: make(chan struct{})} + // prevent seek to earliest patch applied when using latest position options + if options.SubscriptionInitialPosition == SubscriptionPositionLatest { + pConsumer.hasSeek = true + } return pConsumer, nil }