Fix LatestPosition option conflict with earliest patch (#10907)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-10-29 21:30:49 +08:00 committed by GitHub
parent b4182587eb
commit 390fad4eb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 138 additions and 126 deletions

View File

@ -312,54 +312,54 @@ func (s *Server) initMeta() error {
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(4)
go s.startStatsChannel(s.serverLoopCtx)
go s.startDataNodeTtLoop(s.serverLoopCtx)
go s.startWatchService(s.serverLoopCtx)
go s.startFlushLoop(s.serverLoopCtx)
s.startStatsChannel(s.serverLoopCtx)
s.startDataNodeTtLoop(s.serverLoopCtx)
s.startWatchService(s.serverLoopCtx)
s.startFlushLoop(s.serverLoopCtx)
go s.session.LivenessCheck(s.serverLoopCtx, func() {
log.Fatal("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID))
})
}
func (s *Server) startStatsChannel(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
statsStream, _ := s.msFactory.NewMsgStream(ctx)
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
log.Debug("dataCoord create stats channel consumer",
zap.String("channelName", Params.StatisticsChannelName),
zap.String("descriptionName", Params.DataCoordSubscriptionName))
statsStream.Start()
defer statsStream.Close()
for {
select {
case <-ctx.Done():
log.Debug("stats channel shutdown")
return
default:
}
msgPack := statsStream.Consume()
if msgPack == nil {
log.Debug("receive nil stats msg, shutdown stats channel")
return
}
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_SegmentStatistics {
log.Warn("receive unknown msg from segment statistics channel",
zap.Stringer("msgType", msg.Type()))
continue
go func() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
defer statsStream.Close()
for {
select {
case <-ctx.Done():
log.Debug("stats channel shutdown")
return
default:
}
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
for _, stat := range ssMsg.SegStats {
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
msgPack := statsStream.Consume()
if msgPack == nil {
log.Debug("receive nil stats msg, shutdown stats channel")
return
}
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_SegmentStatistics {
log.Warn("receive unknown msg from segment statistics channel",
zap.Stringer("msgType", msg.Type()))
continue
}
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
for _, stat := range ssMsg.SegStats {
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
}
}
}
}
}()
}
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
if err != nil {
log.Error("new msg stream failed", zap.Error(err))
@ -371,97 +371,103 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
zap.String("timeTickChannelName", Params.TimeTickChannelName),
zap.String("subscriptionName", Params.DataCoordSubscriptionName))
ttMsgStream.Start()
defer ttMsgStream.Close()
var checker *LongTermChecker
if enableTtChecker {
checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
checker.Start()
defer checker.Stop()
}
for {
select {
case <-ctx.Done():
log.Debug("data node tt loop shutdown")
return
default:
go func() {
var checker *LongTermChecker
if enableTtChecker {
checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
checker.Start()
defer checker.Stop()
}
msgPack := ttMsgStream.Consume()
if msgPack == nil {
log.Debug("receive nil tt msg, shutdown tt channel")
return
}
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_DataNodeTt {
log.Warn("receive unexpected msg type from tt channel",
zap.Stringer("msgType", msg.Type()))
continue
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
defer ttMsgStream.Close()
for {
select {
case <-ctx.Done():
log.Debug("data node tt loop shutdown")
return
default:
}
ttMsg := msg.(*msgstream.DataNodeTtMsg)
if enableTtChecker {
checker.Check()
msgPack := ttMsgStream.Consume()
if msgPack == nil {
log.Debug("receive nil tt msg, shutdown tt channel")
return
}
ch := ttMsg.ChannelName
ts := ttMsg.Timestamp
if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
log.Warn("failed to expire allocations", zap.Error(err))
continue
}
physical, _ := tsoutil.ParseTS(ts)
if time.Since(physical).Minutes() > 1 {
// if lag behind, log every 1 mins about
log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
}
segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
if err != nil {
log.Warn("get flushable segments failed", zap.Error(err))
continue
}
staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return !info.lastFlushTime.IsZero() && time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
})
if len(segments)+len(staleSegments) == 0 {
continue
}
log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments)))
segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
for _, id := range segments {
sInfo := s.meta.GetSegment(id)
if sInfo == nil {
log.Error("get segment from meta error", zap.Int64("id", id),
zap.Error(err))
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_DataNodeTt {
log.Warn("receive unexpected msg type from tt channel",
zap.Stringer("msgType", msg.Type()))
continue
}
segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
s.meta.SetLastFlushTime(id, time.Now())
}
markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments))
for _, segment := range staleSegments {
for _, fSeg := range segmentInfos {
// check segment needs flush first
if segment.GetID() == fSeg.GetID() {
ttMsg := msg.(*msgstream.DataNodeTtMsg)
if enableTtChecker {
checker.Check()
}
ch := ttMsg.ChannelName
ts := ttMsg.Timestamp
if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
log.Warn("failed to expire allocations", zap.Error(err))
continue
}
physical, _ := tsoutil.ParseTS(ts)
if time.Since(physical).Minutes() > 1 {
// if lag behind, log every 1 mins about
log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
}
segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
if err != nil {
log.Warn("get flushable segments failed", zap.Error(err))
continue
}
staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return !info.lastFlushTime.IsZero() && time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
})
if len(segments)+len(staleSegments) == 0 {
continue
}
log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments)))
segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
for _, id := range segments {
sInfo := s.meta.GetSegment(id)
if sInfo == nil {
log.Error("get segment from meta error", zap.Int64("id", id),
zap.Error(err))
continue
}
segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
s.meta.SetLastFlushTime(id, time.Now())
}
markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments))
for _, segment := range staleSegments {
for _, fSeg := range segmentInfos {
// check segment needs flush first
if segment.GetID() == fSeg.GetID() {
continue
}
}
markSegments = append(markSegments, segment.SegmentInfo)
s.meta.SetLastFlushTime(segment.GetID(), time.Now())
}
if len(segmentInfos)+len(markSegments) > 0 {
s.cluster.Flush(s.ctx, segmentInfos, markSegments)
}
markSegments = append(markSegments, segment.SegmentInfo)
s.meta.SetLastFlushTime(segment.GetID(), time.Now())
}
if len(segmentInfos)+len(markSegments) > 0 {
s.cluster.Flush(s.ctx, segmentInfos, markSegments)
}
s.helper.eventAfterHandleDataNodeTt()
}
s.helper.eventAfterHandleDataNodeTt()
}
}()
}
//go:norace
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
// start a goroutine wto watch services
func (s *Server) startWatchService(ctx context.Context) {
go s.watchService(ctx)
}
// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
for {
@ -484,6 +490,7 @@ func (s *Server) startWatchService(ctx context.Context) {
}
}
}
}
// handles session events - DataNodes Add/Del
@ -527,22 +534,24 @@ func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.Sess
}
func (s *Server) startFlushLoop(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ctx2, cancel := context.WithCancel(ctx)
defer cancel()
// send `Flushing` segments
go s.handleFlushingSegments(ctx2)
for {
select {
case <-ctx.Done():
log.Debug("flush loop shutdown")
return
case segmentID := <-s.flushCh:
//Ignore return error
_ = s.postFlush(ctx, segmentID)
go func() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ctx2, cancel := context.WithCancel(ctx)
defer cancel()
// send `Flushing` segments
go s.handleFlushingSegments(ctx2)
for {
select {
case <-ctx.Done():
log.Debug("flush loop shutdown")
return
case segmentID := <-s.flushCh:
//Ignore return error
_ = s.postFlush(ctx, segmentID)
}
}
}
}()
}
// post function after flush is done

View File

@ -592,7 +592,7 @@ func TestService_WatchServices(t *testing.T) {
signal := make(chan struct{}, 1)
go func() {
svr.startWatchService(context.Background())
svr.watchService(context.Background())
flag = true
signal <- struct{}{}
}()
@ -609,7 +609,7 @@ func TestService_WatchServices(t *testing.T) {
svr.serverLoopWg.Add(1)
go func() {
svr.startWatchService(ctx)
svr.watchService(ctx)
flag = true
signal <- struct{}{}
}()

View File

@ -67,9 +67,12 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) {
if err != nil {
return nil, err
}
//consumer.Seek(pulsar.EarliestMessageID())
//consumer.SeekByTime(time.Unix(0, 0))
pConsumer := &pulsarConsumer{c: consumer, closeCh: make(chan struct{})}
// prevent seek to earliest patch applied when using latest position options
if options.SubscriptionInitialPosition == SubscriptionPositionLatest {
pConsumer.hasSeek = true
}
return pConsumer, nil
}