diff --git a/internal/masterservice/id.go b/internal/allocator/global_id.go similarity index 81% rename from internal/masterservice/id.go rename to internal/allocator/global_id.go index 7ccb24d48a..e4a569fd95 100644 --- a/internal/masterservice/id.go +++ b/internal/allocator/global_id.go @@ -1,18 +1,25 @@ -package masterservice +package allocator import ( "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/tso" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) +type GIDAllocator interface { + Alloc(count uint32) (UniqueID, UniqueID, error) + AllocOne() (UniqueID, error) + UpdateID() error +} + // GlobalTSOAllocator is the global single point TSO allocator. type GlobalIDAllocator struct { - allocator Allocator + allocator tso.Allocator } func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator { return &GlobalIDAllocator{ - allocator: NewGlobalTSOAllocator(key, base), + allocator: tso.NewGlobalTSOAllocator(key, base), } } diff --git a/internal/indexservice/id.go b/internal/indexservice/id.go deleted file mode 100644 index 6e2ff3e85e..0000000000 --- a/internal/indexservice/id.go +++ /dev/null @@ -1,52 +0,0 @@ -package indexservice - -import ( - "github.com/zilliztech/milvus-distributed/internal/kv" -) - -type IDAllocator interface { - Alloc(count uint32) (UniqueID, UniqueID, error) - AllocOne() (UniqueID, error) - UpdateID() error -} - -// GlobalTSOAllocator is the global single point TSO allocator. -type GlobalIDAllocator struct { - allocator Allocator -} - -func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator { - return &GlobalIDAllocator{ - allocator: NewGlobalTSOAllocator(key, base), - } -} - -// Initialize will initialize the created global TSO allocator. -func (gia *GlobalIDAllocator) Initialize() error { - return gia.allocator.Initialize() -} - -// GenerateTSO is used to generate a given number of TSOs. -// Make sure you have initialized the TSO allocator before calling. -func (gia *GlobalIDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { - timestamp, err := gia.allocator.GenerateTSO(count) - if err != nil { - return 0, 0, err - } - idStart := UniqueID(timestamp) - idEnd := idStart + int64(count) - return idStart, idEnd, nil -} - -func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) { - timestamp, err := gia.allocator.GenerateTSO(1) - if err != nil { - return 0, err - } - idStart := UniqueID(timestamp) - return idStart, nil -} - -func (gia *GlobalIDAllocator) UpdateID() error { - return gia.allocator.UpdateTSO() -} diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 4ab997a14c..e9d4bf714d 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -7,6 +7,9 @@ import ( "sync" "time" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/tso" + "go.etcd.io/etcd/clientv3" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -38,7 +41,7 @@ type ServiceImpl struct { sched *TaskScheduler - idAllocator *GlobalIDAllocator + idAllocator *allocator.GlobalIDAllocator kv kv.Base @@ -88,7 +91,7 @@ func (i *ServiceImpl) Init() error { //init idAllocator kvRootPath := Params.KvRootPath - i.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "index_gid")) + i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "index_gid")) if err := i.idAllocator.Initialize(); err != nil { return err } @@ -316,7 +319,7 @@ func (i *ServiceImpl) NotifyBuildIndex(nty *indexpb.BuildIndexNotification) (*co } func (i *ServiceImpl) tsLoop() { - tsoTicker := time.NewTicker(UpdateTimestampStep) + tsoTicker := time.NewTicker(tso.UpdateTimestampStep) defer tsoTicker.Stop() ctx, cancel := context.WithCancel(i.loopCtx) defer cancel() diff --git a/internal/indexservice/task.go b/internal/indexservice/task.go index 8fa164c597..75a89cdde4 100644 --- a/internal/indexservice/task.go +++ b/internal/indexservice/task.go @@ -4,6 +4,8 @@ import ( "context" "log" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" @@ -61,7 +63,7 @@ type IndexAddTask struct { BaseTask req *indexpb.BuildIndexRequest indexBuildID UniqueID - idAllocator *GlobalIDAllocator + idAllocator *allocator.GlobalIDAllocator buildQueue TaskQueue kv kv.Base builderClient typeutil.IndexNodeInterface diff --git a/internal/indexservice/task_scheduler.go b/internal/indexservice/task_scheduler.go index 7e6232ba70..f3578196be 100644 --- a/internal/indexservice/task_scheduler.go +++ b/internal/indexservice/task_scheduler.go @@ -7,6 +7,8 @@ import ( "log" "sync" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/opentracing/opentracing-go" oplog "github.com/opentracing/opentracing-go/log" "github.com/zilliztech/milvus-distributed/internal/kv" @@ -172,7 +174,7 @@ func NewIndexAddTaskQueue(sched *TaskScheduler) *IndexAddTaskQueue { type TaskScheduler struct { IndexAddQueue TaskQueue - idAllocator *GlobalIDAllocator + idAllocator *allocator.GlobalIDAllocator metaTable *metaTable kv kv.Base @@ -182,7 +184,7 @@ type TaskScheduler struct { } func NewTaskScheduler(ctx context.Context, - idAllocator *GlobalIDAllocator, + idAllocator *allocator.GlobalIDAllocator, kv kv.Base, table *metaTable) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) diff --git a/internal/indexservice/tso.go b/internal/indexservice/tso.go deleted file mode 100644 index 0728ae9b2f..0000000000 --- a/internal/indexservice/tso.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright 2016 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package indexservice - -import ( - "log" - "sync/atomic" - "time" - "unsafe" - - "go.uber.org/zap" - - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" -) - -const ( - // UpdateTimestampStep is used to update timestamp. - UpdateTimestampStep = 50 * time.Millisecond - // updateTimestampGuard is the min timestamp interval. - updateTimestampGuard = time.Millisecond - // maxLogical is the max upper limit for logical time. - // When a TSO's logical time reaches this limit, - // the physical time will be forced to increase. - maxLogical = int64(1 << 18) -) - -// atomicObject is used to store the current TSO in memory. -type atomicObject struct { - physical time.Time - logical int64 -} - -// timestampOracle is used to maintain the logic of tso. -type timestampOracle struct { - key string - kvBase kv.TxnBase - - // TODO: remove saveInterval - saveInterval time.Duration - maxResetTSGap func() time.Duration - // For tso, set after the PD becomes a leader. - TSO unsafe.Pointer - lastSavedTime atomic.Value -} - -func (t *timestampOracle) loadTimestamp() (time.Time, error) { - strData, err := t.kvBase.Load(t.key) - - var binData []byte = []byte(strData) - - if err != nil { - return typeutil.ZeroTime, err - } - if len(binData) == 0 { - return typeutil.ZeroTime, nil - } - return typeutil.ParseTimestamp(binData) -} - -// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, -// otherwise, update it. -func (t *timestampOracle) saveTimestamp(ts time.Time) error { - data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) - err := t.kvBase.Save(t.key, string(data)) - if err != nil { - return errors.WithStack(err) - } - t.lastSavedTime.Store(ts) - return nil -} - -func (t *timestampOracle) InitTimestamp() error { - - //last, err := t.loadTimestamp() - //if err != nil { - // return err - //} - - next := time.Now() - - // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, - // the timestamp allocation will start from the saved etcd timestamp temporarily. - //if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { - // next = last.Add(updateTimestampGuard) - //} - - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - - //log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) - - current := &atomicObject{ - physical: next, - } - atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) - - return nil -} - -// ResetUserTimestamp update the physical part with specified tso. -func (t *timestampOracle) ResetUserTimestamp(tso uint64) error { - physical, _ := tsoutil.ParseTS(tso) - next := physical.Add(time.Millisecond) - prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) - - // do not update - if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard { - return errors.New("the specified ts too small than now") - } - - if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() { - return errors.New("the specified ts too large than now") - } - - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - update := &atomicObject{ - physical: next, - } - atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update)) - return nil -} - -// UpdateTimestamp is used to update the timestamp. -// This function will do two things: -// 1. When the logical time is going to be used up, increase the current physical time. -// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time -// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and -// we also need to save the next physical time plus `TsoSaveInterval` into etcd. -// -// Here is some constraints that this function must satisfy: -// 1. The saved time is monotonically increasing. -// 2. The physical time is monotonically increasing. -// 3. The physical time is always less than the saved timestamp. -func (t *timestampOracle) UpdateTimestamp() error { - prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) - now := time.Now() - - jetLag := typeutil.SubTimeByWallClock(now, prev.physical) - if jetLag > 3*UpdateTimestampStep { - log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) - } - - var next time.Time - prevLogical := atomic.LoadInt64(&prev.logical) - // If the system time is greater, it will be synchronized with the system time. - if jetLag > updateTimestampGuard { - next = now - } else if prevLogical > maxLogical/2 { - // The reason choosing maxLogical/2 here is that it's big enough for common cases. - // Because there is enough timestamp can be allocated before next update. - log.Print("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) - next = prev.physical.Add(time.Millisecond) - } else { - // It will still use the previous physical time to alloc the timestamp. - return nil - } - - // It is not safe to increase the physical time to `next`. - // The time window needs to be updated and saved to etcd. - if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard { - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - } - - current := &atomicObject{ - physical: next, - logical: 0, - } - - atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) - - return nil -} - -// ResetTimestamp is used to reset the timestamp. -func (t *timestampOracle) ResetTimestamp() { - zero := &atomicObject{ - physical: time.Now(), - } - atomic.StorePointer(&t.TSO, unsafe.Pointer(zero)) -} diff --git a/internal/masterservice/global_allocator.go b/internal/masterservice/global_allocator.go deleted file mode 100644 index b757e2bbd8..0000000000 --- a/internal/masterservice/global_allocator.go +++ /dev/null @@ -1,117 +0,0 @@ -package masterservice - -import ( - "sync/atomic" - "time" - - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/log" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.uber.org/zap" -) - -// Allocator is a Timestamp Oracle allocator. -type Allocator interface { - // Initialize is used to initialize a TSO allocator. - // It will synchronize TSO with etcd and initialize the - // memory for later allocation work. - Initialize() error - // UpdateTSO is used to update the TSO in memory and the time window in etcd. - UpdateTSO() error - // SetTSO sets the physical part with given tso. It's mainly used for BR restore - // and can not forcibly set the TSO smaller than now. - SetTSO(tso uint64) error - // GenerateTSO is used to generate a given number of TSOs. - // Make sure you have initialized the TSO allocator before calling. - GenerateTSO(count uint32) (uint64, error) - // Reset is used to reset the TSO allocator. - Reset() -} - -// GlobalTSOAllocator is the global single point TSO allocator. -type GlobalTSOAllocator struct { - tso *timestampOracle -} - -// NewGlobalTSOAllocator creates a new global TSO allocator. -func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator { - var saveInterval = 3 * time.Second - return &GlobalTSOAllocator{ - tso: ×tampOracle{ - kvBase: kvBase, - saveInterval: saveInterval, - maxResetTSGap: func() time.Duration { return 3 * time.Second }, - key: key, - }, - } -} - -// Initialize will initialize the created global TSO allocator. -func (gta *GlobalTSOAllocator) Initialize() error { - return gta.tso.InitTimestamp() -} - -// UpdateTSO is used to update the TSO in memory and the time window in etcd. -func (gta *GlobalTSOAllocator) UpdateTSO() error { - return gta.tso.UpdateTimestamp() -} - -// SetTSO sets the physical part with given tso. -func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error { - return gta.tso.ResetUserTimestamp(tso) -} - -// GenerateTSO is used to generate a given number of TSOs. -// Make sure you have initialized the TSO allocator before calling. -func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { - var physical, logical int64 - if count == 0 { - return 0, errors.New("tso count should be positive") - } - - maxRetryCount := 10 - - for i := 0; i < maxRetryCount; i++ { - current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO)) - if current == nil || current.physical.Equal(typeutil.ZeroTime) { - // If it's leader, maybe SyncTimestamp hasn't completed yet - log.Debug("sync hasn't completed yet, wait for a while") - time.Sleep(200 * time.Millisecond) - continue - } - - physical = current.physical.UnixNano() / int64(time.Millisecond) - logical = atomic.AddInt64(¤t.logical, int64(count)) - if logical >= maxLogical { - log.Debug("logical part outside of max logical interval, please check ntp time", zap.Int("retry-count", i)) - time.Sleep(UpdateTimestampStep) - continue - } - return tsoutil.ComposeTS(physical, logical), nil - } - return 0, errors.New("can not get timestamp") -} - -func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) { - //return gta.tso.SyncTimestamp() - start, err := gta.GenerateTSO(count) - if err != nil { - return typeutil.ZeroTimestamp, err - } - //ret := make([]typeutil.Timestamp, count) - //for i:=uint32(0); i < count; i++{ - // ret[i] = start + uint64(i) - //} - return start, err -} - -func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) { - return gta.GenerateTSO(1) -} - -// Reset is used to reset the TSO allocator. -func (gta *GlobalTSOAllocator) Reset() { - gta.tso.ResetTimestamp() -} diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 90ea84c3d3..8474675cb4 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -8,6 +8,9 @@ import ( "sync/atomic" "time" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/tso" + "github.com/zilliztech/milvus-distributed/internal/errors" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/log" @@ -114,9 +117,9 @@ type Core struct { MetaTable *metaTable //id allocator - idAllocator *GlobalIDAllocator + idAllocator *allocator.GlobalIDAllocator //tso allocator - tsoAllocator *GlobalTSOAllocator + tsoAllocator *tso.GlobalTSOAllocator //inner members ctx context.Context @@ -405,7 +408,7 @@ func (c *Core) startSegmentFlushCompletedLoop() { } func (c *Core) tsLoop() { - tsoTicker := time.NewTicker(UpdateTimestampStep) + tsoTicker := time.NewTicker(tso.UpdateTimestampStep) defer tsoTicker.Stop() ctx, cancel := context.WithCancel(c.ctx) defer cancel() @@ -775,11 +778,11 @@ func (c *Core) Init() error { c.kvBase = etcdkv.NewEtcdKV(c.etcdCli, Params.KvRootPath) - c.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid")) + c.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid")) if initError = c.idAllocator.Initialize(); initError != nil { return } - c.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso")) + c.tsoAllocator = tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso")) if initError = c.tsoAllocator.Initialize(); initError != nil { return } diff --git a/internal/masterservice/tso.go b/internal/masterservice/tso.go deleted file mode 100644 index e164a48386..0000000000 --- a/internal/masterservice/tso.go +++ /dev/null @@ -1,188 +0,0 @@ -package masterservice - -import ( - "sync/atomic" - "time" - "unsafe" - - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/log" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.uber.org/zap" -) - -const ( - // UpdateTimestampStep is used to update timestamp. - UpdateTimestampStep = 50 * time.Millisecond - // updateTimestampGuard is the min timestamp interval. - updateTimestampGuard = time.Millisecond - // maxLogical is the max upper limit for logical time. - // When a TSO's logical time reaches this limit, - // the physical time will be forced to increase. - maxLogical = int64(1 << 18) -) - -// atomicObject is used to store the current TSO in memory. -type atomicObject struct { - physical time.Time - logical int64 -} - -// timestampOracle is used to maintain the logic of tso. -type timestampOracle struct { - key string - kvBase kv.TxnBase - - // TODO: remove saveInterval - saveInterval time.Duration - maxResetTSGap func() time.Duration - // For tso, set after the PD becomes a leader. - TSO unsafe.Pointer - lastSavedTime atomic.Value -} - -func (t *timestampOracle) loadTimestamp() (time.Time, error) { - strData, err := t.kvBase.Load(t.key) - - var binData []byte = []byte(strData) - - if err != nil { - return typeutil.ZeroTime, err - } - if len(binData) == 0 { - return typeutil.ZeroTime, nil - } - return typeutil.ParseTimestamp(binData) -} - -// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, -// otherwise, update it. -func (t *timestampOracle) saveTimestamp(ts time.Time) error { - data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) - err := t.kvBase.Save(t.key, string(data)) - if err != nil { - return errors.WithStack(err) - } - t.lastSavedTime.Store(ts) - return nil -} - -func (t *timestampOracle) InitTimestamp() error { - - //last, err := t.loadTimestamp() - //if err != nil { - // return err - //} - - next := time.Now() - - // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, - // the timestamp allocation will start from the saved etcd timestamp temporarily. - //if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { - // next = last.Add(updateTimestampGuard) - //} - - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - - //log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) - - current := &atomicObject{ - physical: next, - } - atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) - - return nil -} - -// ResetUserTimestamp update the physical part with specified tso. -func (t *timestampOracle) ResetUserTimestamp(tso uint64) error { - physical, _ := tsoutil.ParseTS(tso) - next := physical.Add(time.Millisecond) - prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) - - // do not update - if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard { - return errors.New("the specified ts too small than now") - } - - if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() { - return errors.New("the specified ts too large than now") - } - - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - update := &atomicObject{ - physical: next, - } - atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update)) - return nil -} - -// UpdateTimestamp is used to update the timestamp. -// This function will do two things: -// 1. When the logical time is going to be used up, increase the current physical time. -// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time -// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and -// we also need to save the next physical time plus `TsoSaveInterval` into etcd. -// -// Here is some constraints that this function must satisfy: -// 1. The saved time is monotonically increasing. -// 2. The physical time is monotonically increasing. -// 3. The physical time is always less than the saved timestamp. -func (t *timestampOracle) UpdateTimestamp() error { - prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) - now := time.Now() - - jetLag := typeutil.SubTimeByWallClock(now, prev.physical) - if jetLag > 3*UpdateTimestampStep { - log.Debug("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) - } - - var next time.Time - prevLogical := atomic.LoadInt64(&prev.logical) - // If the system time is greater, it will be synchronized with the system time. - if jetLag > updateTimestampGuard { - next = now - } else if prevLogical > maxLogical/2 { - // The reason choosing maxLogical/2 here is that it's big enough for common cases. - // Because there is enough timestamp can be allocated before next update. - log.Debug("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) - next = prev.physical.Add(time.Millisecond) - } else { - // It will still use the previous physical time to alloc the timestamp. - return nil - } - - // It is not safe to increase the physical time to `next`. - // The time window needs to be updated and saved to etcd. - if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard { - save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { - return err - } - } - - current := &atomicObject{ - physical: next, - logical: 0, - } - - atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) - - return nil -} - -// ResetTimestamp is used to reset the timestamp. -func (t *timestampOracle) ResetTimestamp() { - zero := &atomicObject{ - physical: time.Now(), - } - atomic.StorePointer(&t.TSO, unsafe.Pointer(zero)) -} diff --git a/internal/msgstream/rmqms/rmq_msgstream_test.go b/internal/msgstream/rmqms/rmq_msgstream_test.go index fa47c69540..3e19f3d7b3 100644 --- a/internal/msgstream/rmqms/rmq_msgstream_test.go +++ b/internal/msgstream/rmqms/rmq_msgstream_test.go @@ -7,6 +7,8 @@ import ( "os" "testing" + "github.com/zilliztech/milvus-distributed/internal/allocator" + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/util/rocksmq" "go.etcd.io/etcd/clientv3" @@ -171,7 +173,7 @@ func initRmq(name string) *etcdkv.EtcdKV { log.Fatalf("New clientv3 error = %v", err) } etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") - idAllocator := rocksmq.NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() err = rocksmq.InitRmq(name, idAllocator) diff --git a/internal/indexservice/global_allocator.go b/internal/tso/global_allocator.go similarity index 87% rename from internal/indexservice/global_allocator.go rename to internal/tso/global_allocator.go index ab82ee696a..4a0bae73b9 100644 --- a/internal/indexservice/global_allocator.go +++ b/internal/tso/global_allocator.go @@ -1,4 +1,17 @@ -package indexservice +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso import ( "log" diff --git a/internal/util/rocksmq/tso.go b/internal/tso/tso.go similarity index 99% rename from internal/util/rocksmq/tso.go rename to internal/tso/tso.go index 0db2901b83..14ef341596 100644 --- a/internal/util/rocksmq/tso.go +++ b/internal/tso/tso.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rocksmq +package tso import ( "log" diff --git a/internal/util/rocksmq/global_allocator.go b/internal/util/rocksmq/global_allocator.go deleted file mode 100644 index d105003df0..0000000000 --- a/internal/util/rocksmq/global_allocator.go +++ /dev/null @@ -1,167 +0,0 @@ -package rocksmq - -import ( - "errors" - "log" - "sync/atomic" - "time" - - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.uber.org/zap" -) - -// Allocator is a Timestamp Oracle allocator. -type Allocator interface { - // Initialize is used to initialize a TSO allocator. - // It will synchronize TSO with etcd and initialize the - // memory for later allocation work. - Initialize() error - // UpdateTSO is used to update the TSO in memory and the time window in etcd. - UpdateTSO() error - // SetTSO sets the physical part with given tso. It's mainly used for BR restore - // and can not forcibly set the TSO smaller than now. - SetTSO(tso uint64) error - // GenerateTSO is used to generate a given number of TSOs. - // Make sure you have initialized the TSO allocator before calling. - GenerateTSO(count uint32) (uint64, error) - // Reset is used to reset the TSO allocator. - Reset() -} - -// GlobalTSOAllocator is the global single point TSO allocator. -type GlobalTSOAllocator struct { - tso *timestampOracle -} - -// NewGlobalTSOAllocator creates a new global TSO allocator. -func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator { - var saveInterval = 3 * time.Second - return &GlobalTSOAllocator{ - tso: ×tampOracle{ - kvBase: kvBase, - saveInterval: saveInterval, - maxResetTSGap: func() time.Duration { return 3 * time.Second }, - key: key, - }, - } -} - -// Initialize will initialize the created global TSO allocator. -func (gta *GlobalTSOAllocator) Initialize() error { - return gta.tso.InitTimestamp() -} - -// UpdateTSO is used to update the TSO in memory and the time window in etcd. -func (gta *GlobalTSOAllocator) UpdateTSO() error { - return gta.tso.UpdateTimestamp() -} - -// SetTSO sets the physical part with given tso. -func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error { - return gta.tso.ResetUserTimestamp(tso) -} - -// GenerateTSO is used to generate a given number of TSOs. -// Make sure you have initialized the TSO allocator before calling. -func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { - var physical, logical int64 - if count == 0 { - return 0, errors.New("tso count should be positive") - } - - maxRetryCount := 10 - - for i := 0; i < maxRetryCount; i++ { - current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO)) - if current == nil || current.physical.Equal(typeutil.ZeroTime) { - // If it's leader, maybe SyncTimestamp hasn't completed yet - log.Println("sync hasn't completed yet, wait for a while") - time.Sleep(200 * time.Millisecond) - continue - } - - physical = current.physical.UnixNano() / int64(time.Millisecond) - logical = atomic.AddInt64(¤t.logical, int64(count)) - if logical >= maxLogical { - log.Println("logical part outside of max logical interval, please check ntp time", - zap.Int("retry-count", i)) - time.Sleep(UpdateTimestampStep) - continue - } - return tsoutil.ComposeTS(physical, logical), nil - } - return 0, errors.New("can not get timestamp") -} - -func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) { - //return gta.tso.SyncTimestamp() - start, err := gta.GenerateTSO(count) - if err != nil { - return typeutil.ZeroTimestamp, err - } - //ret := make([]typeutil.Timestamp, count) - //for i:=uint32(0); i < count; i++{ - // ret[i] = start + uint64(i) - //} - return start, err -} - -func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) { - return gta.GenerateTSO(1) -} - -// Reset is used to reset the TSO allocator. -func (gta *GlobalTSOAllocator) Reset() { - gta.tso.ResetTimestamp() -} - -/////////////////////////////////////////////////////////////////////// - -type IDAllocator interface { - Alloc(count uint32) (UniqueID, UniqueID, error) - AllocOne() (UniqueID, error) - UpdateID() error -} - -// GlobalTSOAllocator is the global single point TSO allocator. -type GlobalIDAllocator struct { - allocator Allocator -} - -func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator { - return &GlobalIDAllocator{ - allocator: NewGlobalTSOAllocator(key, base), - } -} - -// Initialize will initialize the created global TSO allocator. -func (gia *GlobalIDAllocator) Initialize() error { - return gia.allocator.Initialize() -} - -// GenerateTSO is used to generate a given number of TSOs. -// Make sure you have initialized the TSO allocator before calling. -func (gia *GlobalIDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { - timestamp, err := gia.allocator.GenerateTSO(count) - if err != nil { - return 0, 0, err - } - idStart := UniqueID(timestamp) - idEnd := idStart + int64(count) - return idStart, idEnd, nil -} - -func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) { - timestamp, err := gia.allocator.GenerateTSO(1) - if err != nil { - return 0, err - } - idStart := UniqueID(timestamp) - return idStart, nil -} - -func (gia *GlobalIDAllocator) UpdateID() error { - return gia.allocator.UpdateTSO() -} diff --git a/internal/util/rocksmq/global_rmq.go b/internal/util/rocksmq/global_rmq.go index fc11ac8025..519de496df 100644 --- a/internal/util/rocksmq/global_rmq.go +++ b/internal/util/rocksmq/global_rmq.go @@ -4,6 +4,8 @@ import ( "os" "sync" + "github.com/zilliztech/milvus-distributed/internal/allocator" + rocksdbkv "github.com/zilliztech/milvus-distributed/internal/kv/rocksdb" ) @@ -16,7 +18,7 @@ type Consumer struct { MsgNum chan int } -func InitRmq(rocksdbName string, idAllocator IDAllocator) error { +func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error { var err error Rmq, err = NewRocksMQ(rocksdbName, idAllocator) return err @@ -33,7 +35,7 @@ func InitRocksMQ(rocksdbName string) error { if err != nil { panic(err) } - idAllocator := NewGlobalIDAllocator("rmq_id", rocksdbKV) + idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV) _ = idAllocator.Initialize() if _, err := os.Stat(rocksdbName); !os.IsNotExist(err) { diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index 5cb53be5e9..83f0de46c5 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -4,6 +4,8 @@ import ( "strconv" "sync" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/tecbot/gorocksdb" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" @@ -72,7 +74,7 @@ type RocksMQ struct { kv kv.Base channels map[string]*Channel cgCtxs map[string]ConsumerGroupContext - idAllocator IDAllocator + idAllocator allocator.GIDAllocator produceMu sync.Mutex consumeMu sync.Mutex @@ -86,7 +88,7 @@ type RocksMQ struct { //tsoTicker *time.Ticker } -func NewRocksMQ(name string, idAllocator IDAllocator) (*RocksMQ, error) { +func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*RocksMQ, error) { bbto := gorocksdb.NewDefaultBlockBasedTableOptions() bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity)) opts := gorocksdb.NewDefaultOptions() diff --git a/internal/util/rocksmq/rocksmq_test.go b/internal/util/rocksmq/rocksmq_test.go index d0ccb93034..d69df08bd9 100644 --- a/internal/util/rocksmq/rocksmq_test.go +++ b/internal/util/rocksmq/rocksmq_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/stretchr/testify/assert" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "go.etcd.io/etcd/clientv3" @@ -29,7 +31,7 @@ func TestRocksMQ(t *testing.T) { assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq" @@ -86,7 +88,7 @@ func TestRocksMQ_Loop(t *testing.T) { assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq_1" @@ -154,7 +156,7 @@ func TestRocksMQ_Goroutines(t *testing.T) { assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq_2" @@ -225,7 +227,7 @@ func TestRocksMQ_Throughout(t *testing.T) { assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq_3" @@ -279,7 +281,7 @@ func TestRocksMQ_MultiChan(t *testing.T) { assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() - idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() name := "/tmp/rocksmq_multichan"