Refactor tso and global id allocator

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2021-02-24 17:12:06 +08:00 committed by yefu.chen
parent aada3aa5db
commit 0679954922
16 changed files with 64 additions and 752 deletions

View File

@ -1,18 +1,25 @@
package masterservice
package allocator
import (
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/tso"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type GIDAllocator interface {
Alloc(count uint32) (UniqueID, UniqueID, error)
AllocOne() (UniqueID, error)
UpdateID() error
}
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalIDAllocator struct {
allocator Allocator
allocator tso.Allocator
}
func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator {
return &GlobalIDAllocator{
allocator: NewGlobalTSOAllocator(key, base),
allocator: tso.NewGlobalTSOAllocator(key, base),
}
}

View File

@ -1,52 +0,0 @@
package indexservice
import (
"github.com/zilliztech/milvus-distributed/internal/kv"
)
type IDAllocator interface {
Alloc(count uint32) (UniqueID, UniqueID, error)
AllocOne() (UniqueID, error)
UpdateID() error
}
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalIDAllocator struct {
allocator Allocator
}
func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator {
return &GlobalIDAllocator{
allocator: NewGlobalTSOAllocator(key, base),
}
}
// Initialize will initialize the created global TSO allocator.
func (gia *GlobalIDAllocator) Initialize() error {
return gia.allocator.Initialize()
}
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gia *GlobalIDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) {
timestamp, err := gia.allocator.GenerateTSO(count)
if err != nil {
return 0, 0, err
}
idStart := UniqueID(timestamp)
idEnd := idStart + int64(count)
return idStart, idEnd, nil
}
func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) {
timestamp, err := gia.allocator.GenerateTSO(1)
if err != nil {
return 0, err
}
idStart := UniqueID(timestamp)
return idStart, nil
}
func (gia *GlobalIDAllocator) UpdateID() error {
return gia.allocator.UpdateTSO()
}

View File

@ -7,6 +7,9 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/tso"
"go.etcd.io/etcd/clientv3"
"github.com/zilliztech/milvus-distributed/internal/errors"
@ -38,7 +41,7 @@ type ServiceImpl struct {
sched *TaskScheduler
idAllocator *GlobalIDAllocator
idAllocator *allocator.GlobalIDAllocator
kv kv.Base
@ -88,7 +91,7 @@ func (i *ServiceImpl) Init() error {
//init idAllocator
kvRootPath := Params.KvRootPath
i.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "index_gid"))
i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "index_gid"))
if err := i.idAllocator.Initialize(); err != nil {
return err
}
@ -316,7 +319,7 @@ func (i *ServiceImpl) NotifyBuildIndex(nty *indexpb.BuildIndexNotification) (*co
}
func (i *ServiceImpl) tsLoop() {
tsoTicker := time.NewTicker(UpdateTimestampStep)
tsoTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsoTicker.Stop()
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()

View File

@ -4,6 +4,8 @@ import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
@ -61,7 +63,7 @@ type IndexAddTask struct {
BaseTask
req *indexpb.BuildIndexRequest
indexBuildID UniqueID
idAllocator *GlobalIDAllocator
idAllocator *allocator.GlobalIDAllocator
buildQueue TaskQueue
kv kv.Base
builderClient typeutil.IndexNodeInterface

View File

@ -7,6 +7,8 @@ import (
"log"
"sync"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/kv"
@ -172,7 +174,7 @@ func NewIndexAddTaskQueue(sched *TaskScheduler) *IndexAddTaskQueue {
type TaskScheduler struct {
IndexAddQueue TaskQueue
idAllocator *GlobalIDAllocator
idAllocator *allocator.GlobalIDAllocator
metaTable *metaTable
kv kv.Base
@ -182,7 +184,7 @@ type TaskScheduler struct {
}
func NewTaskScheduler(ctx context.Context,
idAllocator *GlobalIDAllocator,
idAllocator *allocator.GlobalIDAllocator,
kv kv.Base,
table *metaTable) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx)

View File

@ -1,202 +0,0 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package indexservice
import (
"log"
"sync/atomic"
"time"
"unsafe"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
// UpdateTimestampStep is used to update timestamp.
UpdateTimestampStep = 50 * time.Millisecond
// updateTimestampGuard is the min timestamp interval.
updateTimestampGuard = time.Millisecond
// maxLogical is the max upper limit for logical time.
// When a TSO's logical time reaches this limit,
// the physical time will be forced to increase.
maxLogical = int64(1 << 18)
)
// atomicObject is used to store the current TSO in memory.
type atomicObject struct {
physical time.Time
logical int64
}
// timestampOracle is used to maintain the logic of tso.
type timestampOracle struct {
key string
kvBase kv.TxnBase
// TODO: remove saveInterval
saveInterval time.Duration
maxResetTSGap func() time.Duration
// For tso, set after the PD becomes a leader.
TSO unsafe.Pointer
lastSavedTime atomic.Value
}
func (t *timestampOracle) loadTimestamp() (time.Time, error) {
strData, err := t.kvBase.Load(t.key)
var binData []byte = []byte(strData)
if err != nil {
return typeutil.ZeroTime, err
}
if len(binData) == 0 {
return typeutil.ZeroTime, nil
}
return typeutil.ParseTimestamp(binData)
}
// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it,
// otherwise, update it.
func (t *timestampOracle) saveTimestamp(ts time.Time) error {
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
err := t.kvBase.Save(t.key, string(data))
if err != nil {
return errors.WithStack(err)
}
t.lastSavedTime.Store(ts)
return nil
}
func (t *timestampOracle) InitTimestamp() error {
//last, err := t.loadTimestamp()
//if err != nil {
// return err
//}
next := time.Now()
// If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`,
// the timestamp allocation will start from the saved etcd timestamp temporarily.
//if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
// next = last.Add(updateTimestampGuard)
//}
save := next.Add(t.saveInterval)
if err := t.saveTimestamp(save); err != nil {
return err
}
//log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
current := &atomicObject{
physical: next,
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))
return nil
}
// ResetUserTimestamp update the physical part with specified tso.
func (t *timestampOracle) ResetUserTimestamp(tso uint64) error {
physical, _ := tsoutil.ParseTS(tso)
next := physical.Add(time.Millisecond)
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
// do not update
if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard {
return errors.New("the specified ts too small than now")
}
if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() {
return errors.New("the specified ts too large than now")
}
save := next.Add(t.saveInterval)
if err := t.saveTimestamp(save); err != nil {
return err
}
update := &atomicObject{
physical: next,
}
atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update))
return nil
}
// UpdateTimestamp is used to update the timestamp.
// This function will do two things:
// 1. When the logical time is going to be used up, increase the current physical time.
// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time
// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and
// we also need to save the next physical time plus `TsoSaveInterval` into etcd.
//
// Here is some constraints that this function must satisfy:
// 1. The saved time is monotonically increasing.
// 2. The physical time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
func (t *timestampOracle) UpdateTimestamp() error {
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
now := time.Now()
jetLag := typeutil.SubTimeByWallClock(now, prev.physical)
if jetLag > 3*UpdateTimestampStep {
log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now))
}
var next time.Time
prevLogical := atomic.LoadInt64(&prev.logical)
// If the system time is greater, it will be synchronized with the system time.
if jetLag > updateTimestampGuard {
next = now
} else if prevLogical > maxLogical/2 {
// The reason choosing maxLogical/2 here is that it's big enough for common cases.
// Because there is enough timestamp can be allocated before next update.
log.Print("the logical time may be not enough", zap.Int64("prev-logical", prevLogical))
next = prev.physical.Add(time.Millisecond)
} else {
// It will still use the previous physical time to alloc the timestamp.
return nil
}
// It is not safe to increase the physical time to `next`.
// The time window needs to be updated and saved to etcd.
if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard {
save := next.Add(t.saveInterval)
if err := t.saveTimestamp(save); err != nil {
return err
}
}
current := &atomicObject{
physical: next,
logical: 0,
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))
return nil
}
// ResetTimestamp is used to reset the timestamp.
func (t *timestampOracle) ResetTimestamp() {
zero := &atomicObject{
physical: time.Now(),
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(zero))
}

View File

@ -1,117 +0,0 @@
package masterservice
import (
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.uber.org/zap"
)
// Allocator is a Timestamp Oracle allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
// It will synchronize TSO with etcd and initialize the
// memory for later allocation work.
Initialize() error
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// SetTSO sets the physical part with given tso. It's mainly used for BR restore
// and can not forcibly set the TSO smaller than now.
SetTSO(tso uint64) error
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
GenerateTSO(count uint32) (uint64, error)
// Reset is used to reset the TSO allocator.
Reset()
}
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
tso *timestampOracle
}
// NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator {
var saveInterval = 3 * time.Second
return &GlobalTSOAllocator{
tso: &timestampOracle{
kvBase: kvBase,
saveInterval: saveInterval,
maxResetTSGap: func() time.Duration { return 3 * time.Second },
key: key,
},
}
}
// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize() error {
return gta.tso.InitTimestamp()
}
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.tso.UpdateTimestamp()
}
// SetTSO sets the physical part with given tso.
func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
return gta.tso.ResetUserTimestamp(tso)
}
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
var physical, logical int64
if count == 0 {
return 0, errors.New("tso count should be positive")
}
maxRetryCount := 10
for i := 0; i < maxRetryCount; i++ {
current := (*atomicObject)(atomic.LoadPointer(&gta.tso.TSO))
if current == nil || current.physical.Equal(typeutil.ZeroTime) {
// If it's leader, maybe SyncTimestamp hasn't completed yet
log.Debug("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
physical = current.physical.UnixNano() / int64(time.Millisecond)
logical = atomic.AddInt64(&current.logical, int64(count))
if logical >= maxLogical {
log.Debug("logical part outside of max logical interval, please check ntp time", zap.Int("retry-count", i))
time.Sleep(UpdateTimestampStep)
continue
}
return tsoutil.ComposeTS(physical, logical), nil
}
return 0, errors.New("can not get timestamp")
}
func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) {
//return gta.tso.SyncTimestamp()
start, err := gta.GenerateTSO(count)
if err != nil {
return typeutil.ZeroTimestamp, err
}
//ret := make([]typeutil.Timestamp, count)
//for i:=uint32(0); i < count; i++{
// ret[i] = start + uint64(i)
//}
return start, err
}
func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) {
return gta.GenerateTSO(1)
}
// Reset is used to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() {
gta.tso.ResetTimestamp()
}

View File

@ -8,6 +8,9 @@ import (
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/tso"
"github.com/zilliztech/milvus-distributed/internal/errors"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/log"
@ -114,9 +117,9 @@ type Core struct {
MetaTable *metaTable
//id allocator
idAllocator *GlobalIDAllocator
idAllocator *allocator.GlobalIDAllocator
//tso allocator
tsoAllocator *GlobalTSOAllocator
tsoAllocator *tso.GlobalTSOAllocator
//inner members
ctx context.Context
@ -405,7 +408,7 @@ func (c *Core) startSegmentFlushCompletedLoop() {
}
func (c *Core) tsLoop() {
tsoTicker := time.NewTicker(UpdateTimestampStep)
tsoTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsoTicker.Stop()
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
@ -775,11 +778,11 @@ func (c *Core) Init() error {
c.kvBase = etcdkv.NewEtcdKV(c.etcdCli, Params.KvRootPath)
c.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid"))
c.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid"))
if initError = c.idAllocator.Initialize(); initError != nil {
return
}
c.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso"))
c.tsoAllocator = tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso"))
if initError = c.tsoAllocator.Initialize(); initError != nil {
return
}

View File

@ -1,188 +0,0 @@
package masterservice
import (
"sync/atomic"
"time"
"unsafe"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.uber.org/zap"
)
const (
// UpdateTimestampStep is used to update timestamp.
UpdateTimestampStep = 50 * time.Millisecond
// updateTimestampGuard is the min timestamp interval.
updateTimestampGuard = time.Millisecond
// maxLogical is the max upper limit for logical time.
// When a TSO's logical time reaches this limit,
// the physical time will be forced to increase.
maxLogical = int64(1 << 18)
)
// atomicObject is used to store the current TSO in memory.
type atomicObject struct {
physical time.Time
logical int64
}
// timestampOracle is used to maintain the logic of tso.
type timestampOracle struct {
key string
kvBase kv.TxnBase
// TODO: remove saveInterval
saveInterval time.Duration
maxResetTSGap func() time.Duration
// For tso, set after the PD becomes a leader.
TSO unsafe.Pointer
lastSavedTime atomic.Value
}
func (t *timestampOracle) loadTimestamp() (time.Time, error) {
strData, err := t.kvBase.Load(t.key)
var binData []byte = []byte(strData)
if err != nil {
return typeutil.ZeroTime, err
}
if len(binData) == 0 {
return typeutil.ZeroTime, nil
}
return typeutil.ParseTimestamp(binData)
}
// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it,
// otherwise, update it.
func (t *timestampOracle) saveTimestamp(ts time.Time) error {
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
err := t.kvBase.Save(t.key, string(data))
if err != nil {
return errors.WithStack(err)
}
t.lastSavedTime.Store(ts)
return nil
}
func (t *timestampOracle) InitTimestamp() error {
//last, err := t.loadTimestamp()
//if err != nil {
// return err
//}
next := time.Now()
// If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`,
// the timestamp allocation will start from the saved etcd timestamp temporarily.
//if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
// next = last.Add(updateTimestampGuard)
//}
save := next.Add(t.saveInterval)
if err := t.saveTimestamp(save); err != nil {
return err
}
//log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
current := &atomicObject{
physical: next,
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))
return nil
}
// ResetUserTimestamp update the physical part with specified tso.
func (t *timestampOracle) ResetUserTimestamp(tso uint64) error {
physical, _ := tsoutil.ParseTS(tso)
next := physical.Add(time.Millisecond)
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
// do not update
if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard {
return errors.New("the specified ts too small than now")
}
if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() {
return errors.New("the specified ts too large than now")
}
save := next.Add(t.saveInterval)
if err := t.saveTimestamp(save); err != nil {
return err
}
update := &atomicObject{
physical: next,
}
atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update))
return nil
}
// UpdateTimestamp is used to update the timestamp.
// This function will do two things:
// 1. When the logical time is going to be used up, increase the current physical time.
// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time
// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and
// we also need to save the next physical time plus `TsoSaveInterval` into etcd.
//
// Here is some constraints that this function must satisfy:
// 1. The saved time is monotonically increasing.
// 2. The physical time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
func (t *timestampOracle) UpdateTimestamp() error {
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
now := time.Now()
jetLag := typeutil.SubTimeByWallClock(now, prev.physical)
if jetLag > 3*UpdateTimestampStep {
log.Debug("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now))
}
var next time.Time
prevLogical := atomic.LoadInt64(&prev.logical)
// If the system time is greater, it will be synchronized with the system time.
if jetLag > updateTimestampGuard {
next = now
} else if prevLogical > maxLogical/2 {
// The reason choosing maxLogical/2 here is that it's big enough for common cases.
// Because there is enough timestamp can be allocated before next update.
log.Debug("the logical time may be not enough", zap.Int64("prev-logical", prevLogical))
next = prev.physical.Add(time.Millisecond)
} else {
// It will still use the previous physical time to alloc the timestamp.
return nil
}
// It is not safe to increase the physical time to `next`.
// The time window needs to be updated and saved to etcd.
if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard {
save := next.Add(t.saveInterval)
if err := t.saveTimestamp(save); err != nil {
return err
}
}
current := &atomicObject{
physical: next,
logical: 0,
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))
return nil
}
// ResetTimestamp is used to reset the timestamp.
func (t *timestampOracle) ResetTimestamp() {
zero := &atomicObject{
physical: time.Now(),
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(zero))
}

View File

@ -7,6 +7,8 @@ import (
"os"
"testing"
"github.com/zilliztech/milvus-distributed/internal/allocator"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq"
"go.etcd.io/etcd/clientv3"
@ -171,7 +173,7 @@ func initRmq(name string) *etcdkv.EtcdKV {
log.Fatalf("New clientv3 error = %v", err)
}
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
idAllocator := rocksmq.NewGlobalIDAllocator("dummy", etcdKV)
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
err = rocksmq.InitRmq(name, idAllocator)

View File

@ -1,4 +1,17 @@
package indexservice
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tso
import (
"log"

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rocksmq
package tso
import (
"log"

View File

@ -1,167 +0,0 @@
package rocksmq
import (
"errors"
"log"
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.uber.org/zap"
)
// Allocator is a Timestamp Oracle allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
// It will synchronize TSO with etcd and initialize the
// memory for later allocation work.
Initialize() error
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// SetTSO sets the physical part with given tso. It's mainly used for BR restore
// and can not forcibly set the TSO smaller than now.
SetTSO(tso uint64) error
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
GenerateTSO(count uint32) (uint64, error)
// Reset is used to reset the TSO allocator.
Reset()
}
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
tso *timestampOracle
}
// NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator {
var saveInterval = 3 * time.Second
return &GlobalTSOAllocator{
tso: &timestampOracle{
kvBase: kvBase,
saveInterval: saveInterval,
maxResetTSGap: func() time.Duration { return 3 * time.Second },
key: key,
},
}
}
// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize() error {
return gta.tso.InitTimestamp()
}
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.tso.UpdateTimestamp()
}
// SetTSO sets the physical part with given tso.
func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
return gta.tso.ResetUserTimestamp(tso)
}
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
var physical, logical int64
if count == 0 {
return 0, errors.New("tso count should be positive")
}
maxRetryCount := 10
for i := 0; i < maxRetryCount; i++ {
current := (*atomicObject)(atomic.LoadPointer(&gta.tso.TSO))
if current == nil || current.physical.Equal(typeutil.ZeroTime) {
// If it's leader, maybe SyncTimestamp hasn't completed yet
log.Println("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
physical = current.physical.UnixNano() / int64(time.Millisecond)
logical = atomic.AddInt64(&current.logical, int64(count))
if logical >= maxLogical {
log.Println("logical part outside of max logical interval, please check ntp time",
zap.Int("retry-count", i))
time.Sleep(UpdateTimestampStep)
continue
}
return tsoutil.ComposeTS(physical, logical), nil
}
return 0, errors.New("can not get timestamp")
}
func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) {
//return gta.tso.SyncTimestamp()
start, err := gta.GenerateTSO(count)
if err != nil {
return typeutil.ZeroTimestamp, err
}
//ret := make([]typeutil.Timestamp, count)
//for i:=uint32(0); i < count; i++{
// ret[i] = start + uint64(i)
//}
return start, err
}
func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) {
return gta.GenerateTSO(1)
}
// Reset is used to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() {
gta.tso.ResetTimestamp()
}
///////////////////////////////////////////////////////////////////////
type IDAllocator interface {
Alloc(count uint32) (UniqueID, UniqueID, error)
AllocOne() (UniqueID, error)
UpdateID() error
}
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalIDAllocator struct {
allocator Allocator
}
func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator {
return &GlobalIDAllocator{
allocator: NewGlobalTSOAllocator(key, base),
}
}
// Initialize will initialize the created global TSO allocator.
func (gia *GlobalIDAllocator) Initialize() error {
return gia.allocator.Initialize()
}
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gia *GlobalIDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) {
timestamp, err := gia.allocator.GenerateTSO(count)
if err != nil {
return 0, 0, err
}
idStart := UniqueID(timestamp)
idEnd := idStart + int64(count)
return idStart, idEnd, nil
}
func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) {
timestamp, err := gia.allocator.GenerateTSO(1)
if err != nil {
return 0, err
}
idStart := UniqueID(timestamp)
return idStart, nil
}
func (gia *GlobalIDAllocator) UpdateID() error {
return gia.allocator.UpdateTSO()
}

View File

@ -4,6 +4,8 @@ import (
"os"
"sync"
"github.com/zilliztech/milvus-distributed/internal/allocator"
rocksdbkv "github.com/zilliztech/milvus-distributed/internal/kv/rocksdb"
)
@ -16,7 +18,7 @@ type Consumer struct {
MsgNum chan int
}
func InitRmq(rocksdbName string, idAllocator IDAllocator) error {
func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error {
var err error
Rmq, err = NewRocksMQ(rocksdbName, idAllocator)
return err
@ -33,7 +35,7 @@ func InitRocksMQ(rocksdbName string) error {
if err != nil {
panic(err)
}
idAllocator := NewGlobalIDAllocator("rmq_id", rocksdbKV)
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
if _, err := os.Stat(rocksdbName); !os.IsNotExist(err) {

View File

@ -4,6 +4,8 @@ import (
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/tecbot/gorocksdb"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
@ -72,7 +74,7 @@ type RocksMQ struct {
kv kv.Base
channels map[string]*Channel
cgCtxs map[string]ConsumerGroupContext
idAllocator IDAllocator
idAllocator allocator.GIDAllocator
produceMu sync.Mutex
consumeMu sync.Mutex
@ -86,7 +88,7 @@ type RocksMQ struct {
//tsoTicker *time.Ticker
}
func NewRocksMQ(name string, idAllocator IDAllocator) (*RocksMQ, error) {
func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*RocksMQ, error) {
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity))
opts := gorocksdb.NewDefaultOptions()

View File

@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/stretchr/testify/assert"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"go.etcd.io/etcd/clientv3"
@ -29,7 +31,7 @@ func TestRocksMQ(t *testing.T) {
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq"
@ -86,7 +88,7 @@ func TestRocksMQ_Loop(t *testing.T) {
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_1"
@ -154,7 +156,7 @@ func TestRocksMQ_Goroutines(t *testing.T) {
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_2"
@ -225,7 +227,7 @@ func TestRocksMQ_Throughout(t *testing.T) {
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_3"
@ -279,7 +281,7 @@ func TestRocksMQ_MultiChan(t *testing.T) {
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_multichan"