From fa2c1c1f405013d79ee72f3c4f26cce20d6c720b Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 13 Mar 2023 18:01:53 +0800 Subject: [PATCH] Fix RootCoord double updates TSO (#22715) Signed-off-by: yah01 --- internal/proxy/timestamp.go | 2 +- internal/rootcoord/root_coord.go | 7 ++----- internal/tso/global_allocator.go | 8 ++++---- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/internal/proxy/timestamp.go b/internal/proxy/timestamp.go index a917278da7..4eb2d04e6b 100644 --- a/internal/proxy/timestamp.go +++ b/internal/proxy/timestamp.go @@ -47,7 +47,7 @@ func newTimestampAllocator(tso timestampAllocatorInterface, peerID UniqueID) (*t func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timestamp, error) { tr := timerecord.NewTimeRecorder("applyTimestamp") - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() req := &rootcoordpb.AllocTimestampRequest{ Base: commonpbutil.NewMsgBase( diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 4c3551c289..30823b39f7 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -238,15 +238,12 @@ func (c *Core) tsLoop() { select { case <-tsoTicker.C: if err := c.tsoAllocator.UpdateTSO(); err != nil { - log.Warn("failed to update timestamp: ", zap.Error(err)) + log.Warn("failed to update tso", zap.Error(err)) continue } ts := c.tsoAllocator.GetLastSavedTime() metrics.RootCoordTimestampSaved.Set(float64(ts.Unix())) - if err := c.tsoAllocator.UpdateTSO(); err != nil { - log.Warn("failed to update id: ", zap.Error(err)) - continue - } + case <-ctx.Done(): return } diff --git a/internal/tso/global_allocator.go b/internal/tso/global_allocator.go index 694cd8488c..66a8c79be8 100644 --- a/internal/tso/global_allocator.go +++ b/internal/tso/global_allocator.go @@ -30,13 +30,13 @@ package tso import ( - "log" "sync/atomic" "time" "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" @@ -119,15 +119,15 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO)) if current == nil || current.physical.Equal(typeutil.ZeroTime) { // If it's leader, maybe SyncTimestamp hasn't completed yet - log.Println("sync hasn't completed yet, wait for a while") + log.Info("sync hasn't completed yet, wait for a while") time.Sleep(200 * time.Millisecond) continue } - physical = current.physical.UnixNano() / int64(time.Millisecond) + physical = current.physical.UnixMilli() logical = atomic.AddInt64(¤t.logical, int64(count)) if logical >= maxLogical && gta.LimitMaxLogic { - log.Println("logical part outside of max logical interval, please check ntp time", + log.Info("logical part outside of max logical interval, please check ntp time", zap.Int("retry-count", i)) time.Sleep(UpdateTimestampStep) continue