From f0cc9c8ad3b172b44b346b283cb831cef235c5fa Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Thu, 29 Oct 2020 09:31:08 +0800 Subject: [PATCH] Add tso for master and add timestamp Signed-off-by: zhenshan.cao --- cmd/master/main.go | 76 ++++ cmd/master/master.go | 38 -- docker-compose.yml | 2 +- internal/master/controller/segment.go | 17 +- internal/master/grpc_service.go | 74 ++-- internal/master/informer/pulsar.go | 4 +- internal/master/kv/kv.go | 1 + internal/master/master.go | 281 ++++++++++++++ internal/master/segment/stats.go | 49 --- internal/master/server.go | 46 --- internal/master/tso/global_allocator.go | 115 ++++++ internal/master/tso/tso.go | 217 +++++++++++ internal/proto/internal_msg.proto | 31 +- internal/proto/internalpb/internal_msg.pb.go | 367 ++++++++++++++----- internal/proto/master.proto | 5 +- internal/proto/masterpb/master.pb.go | 123 +++++-- internal/util/tsoutil/tso.go | 41 +++ internal/util/typeutil/convension.go | 35 ++ internal/util/typeutil/time.go | 34 ++ 19 files changed, 1264 insertions(+), 292 deletions(-) create mode 100644 cmd/master/main.go delete mode 100644 cmd/master/master.go create mode 100644 internal/master/master.go delete mode 100644 internal/master/segment/stats.go delete mode 100644 internal/master/server.go create mode 100644 internal/master/tso/global_allocator.go create mode 100644 internal/master/tso/tso.go create mode 100644 internal/util/tsoutil/tso.go create mode 100644 internal/util/typeutil/convension.go create mode 100644 internal/util/typeutil/time.go diff --git a/cmd/master/main.go b/cmd/master/main.go new file mode 100644 index 0000000000..0bf8d9761a --- /dev/null +++ b/cmd/master/main.go @@ -0,0 +1,76 @@ +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/master" + "go.uber.org/zap" + "log" + "os" + "os/signal" + "syscall" +) + + +func main() { + + var yamlFile string + flag.StringVar(&yamlFile, "yaml", "", "yaml file") + flag.Parse() + // flag.Usage() + log.Println("yaml file: ", yamlFile) + conf.LoadConfig(yamlFile) + + // Creates server. + ctx, cancel := context.WithCancel(context.Background()) + svr, err := master.CreateServer(ctx) + if err != nil { + log.Print("create server failed", zap.Error(err)) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + <-ctx.Done() + log.Print("Got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + os.Exit(code) +} diff --git a/cmd/master/master.go b/cmd/master/master.go deleted file mode 100644 index 032767998a..0000000000 --- a/cmd/master/master.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "github.com/zilliztech/milvus-distributed/internal/conf" - "github.com/zilliztech/milvus-distributed/internal/master" -) - -// func main() { -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// cfg := config.NewConfig() -// s, err := server.CreateServer(ctx, cfg) -// if err != nil { -// panic(err) -// } -// err = s.Run() -// if err != nil { -// fmt.Println(err) -// } -// } - -func init() { - // go mock.FakePulsarProducer() -} -func main() { - var yamlFile string - flag.StringVar(&yamlFile, "yaml", "", "yaml file") - flag.Parse() - // flag.Usage() - fmt.Println("yaml file: ", yamlFile) - conf.LoadConfig(yamlFile) - - master.Run() - //master.SegmentStatsController() - //master.CollectionController() -} diff --git a/docker-compose.yml b/docker-compose.yml index ad59f18c54..010afc26e1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,5 +27,5 @@ services: /milvus-distributed/scripts/core_build.sh -u && \ go build -o /milvus-distributed/cmd/writer/writer /milvus-distributed/cmd/writer/writer.go && \ go build -o /milvus-distributed/cmd/reader/reader /milvus-distributed/cmd/reader/reader.go && \ - go build -o /milvus-distributed/cmd/master/master /milvus-distributed/cmd/master/master.go && \ + go build -o /milvus-distributed/cmd/master/master /milvus-distributed/cmd/master/main.go && \ go build -o /milvus-distributed/cmd/proxy/proxy /milvus-distributed/cmd/proxy/proxy.go" diff --git a/internal/master/controller/segment.go b/internal/master/controller/segment.go index 590162cc64..65fcab0fdd 100644 --- a/internal/master/controller/segment.go +++ b/internal/master/controller/segment.go @@ -9,26 +9,11 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/master/collection" "github.com/zilliztech/milvus-distributed/internal/master/id" - "github.com/zilliztech/milvus-distributed/internal/master/informer" + //"github.com/zilliztech/milvus-distributed/internal/master/informer" "github.com/zilliztech/milvus-distributed/internal/master/kv" "github.com/zilliztech/milvus-distributed/internal/master/segment" ) -func SegmentStatsController(kvbase kv.Base, errch chan error) { - ssChan := make(chan internalpb.SegmentStatistics, 10) - ssClient := informer.NewPulsarClient() - go segment.Listener(ssChan, ssClient) - for { - select { - case ss := <-ssChan: - errch <- ComputeCloseTime(ss, kvbase) - case <-time.After(5 * time.Second): - fmt.Println("wait for new request") - return - } - } - -} func ComputeCloseTime(ss internalpb.SegmentStatistics, kvbase kv.Base) error { if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) { diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index 8e23b161a6..99bbeb44e0 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -2,43 +2,23 @@ package master import ( "context" - "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/master/kv" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - "google.golang.org/grpc" - "net" - "strconv" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "io" + "log" + "time" ) -func Server(ch chan *schemapb.CollectionSchema, errch chan error, kvbase kv.Base) { - defaultGRPCPort := ":" - defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) - lis, err := net.Listen("tcp", defaultGRPCPort) - if err != nil { - // log.Fatal("failed to listen: %v", err) - errch <- err - return - } - s := grpc.NewServer() - masterpb.RegisterMasterServer(s, Master{CreateRequest: ch, kvBase: kvbase}) - if err := s.Serve(lis); err != nil { - // log.Fatalf("failed to serve: %v", err) - errch <- err - return - } -} -type Master struct { - CreateRequest chan *schemapb.CollectionSchema - kvBase kv.Base - scheduler *ddRequestScheduler - mt metaTable -} +const slowThreshold = 5 * time.Millisecond + + func (ms Master) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) { var t task = &createCollectionTask{ @@ -147,3 +127,39 @@ func (ms Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio }, }, nil } + + +//----------------------------------------Internal GRPC Service-------------------------------- + +// Tso implements gRPC PDServer. +func (ms *Master) Tso(stream masterpb.Master_TsoServer) error { + for { + request, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + start := time.Now() + + count := request.GetCount() + ts, err := ms.tsoAllocator.GenerateTSO(count) + if err != nil { + return status.Errorf(codes.Unknown, err.Error()) + } + + elapsed := time.Since(start) + if elapsed > slowThreshold { + log.Println("get timestamp too slow", zap.Duration("cost", elapsed)) + } + response := &internalpb.TsoResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, + Timestamp: &ts, + Count: count, + } + if err := stream.Send(response); err != nil { + return errors.WithStack(err) + } + } +} diff --git a/internal/master/informer/pulsar.go b/internal/master/informer/pulsar.go index d51f4c835e..04aba9f34c 100644 --- a/internal/master/informer/pulsar.go +++ b/internal/master/informer/pulsar.go @@ -10,7 +10,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar" ) -func NewPulsarClient() PulsarClient { +func NewPulsarClient() *PulsarClient { pulsarAddr := "pulsar://" pulsarAddr += conf.Config.Pulsar.Address pulsarAddr += ":" @@ -24,7 +24,7 @@ func NewPulsarClient() PulsarClient { log.Fatalf("Could not instantiate Pulsar client: %v", err) } - return PulsarClient{ + return &PulsarClient{ Client: client, } } diff --git a/internal/master/kv/kv.go b/internal/master/kv/kv.go index 0485299d10..d318af8eaf 100644 --- a/internal/master/kv/kv.go +++ b/internal/master/kv/kv.go @@ -9,4 +9,5 @@ type Base interface { Watch(key string) clientv3.WatchChan WatchWithPrefix(key string) clientv3.WatchChan LoadWithPrefix(key string) ( []string, []string) + Close() } diff --git a/internal/master/master.go b/internal/master/master.go new file mode 100644 index 0000000000..b09c2e9b59 --- /dev/null +++ b/internal/master/master.go @@ -0,0 +1,281 @@ +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package master + +import ( + "context" + "fmt" + "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/master/informer" + "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/master/controller" + "github.com/apache/pulsar-client-go/pulsar" + "google.golang.org/grpc" + "log" + "math/rand" + "net" + "strconv" + "github.com/golang/protobuf/proto" + "sync" + "sync/atomic" + "time" + + "github.com/zilliztech/milvus-distributed/internal/master/tso" + "go.etcd.io/etcd/clientv3" +) + + +// Server is the pd server. +type Master struct { + // Server state. + isServing int64 + + // Server start timestamp + startTimestamp int64 + + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel func() + serverLoopWg sync.WaitGroup + + //grpc server + grpcServer * grpc.Server + + // for tso. + tsoAllocator tso.Allocator + + // pulsar client + pc *informer.PulsarClient + + // chans + ssChan chan internalpb.SegmentStatistics + + kvBase kv.Base + scheduler *ddRequestScheduler + mt metaTable + // Add callback functions at different stages + startCallbacks []func() + closeCallbacks []func() +} + +func newKvBase() kv.Base { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) + cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{etcdAddr}, + DialTimeout: 5 * time.Second, + }) + // defer cli.Close() + kvBase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + return kvBase +} + +// CreateServer creates the UNINITIALIZED pd server with given configuration. +func CreateServer(ctx context.Context) (*Master, error) { + rand.Seed(time.Now().UnixNano()) + m := &Master{ + ctx: ctx, + startTimestamp: time.Now().Unix(), + kvBase: newKvBase(), + ssChan: make(chan internalpb.SegmentStatistics, 10), + pc : informer.NewPulsarClient(), + } + + m.grpcServer = grpc.NewServer() + masterpb.RegisterMasterServer(m.grpcServer, m) + return m, nil +} + +// AddStartCallback adds a callback in the startServer phase. +func (s *Master) AddStartCallback(callbacks ...func()) { + s.startCallbacks = append(s.startCallbacks, callbacks...) +} + +func (s *Master) startServer(ctx context.Context) error { + + // Run callbacks + for _, cb := range s.startCallbacks { + cb() + } + + // Server has started. + atomic.StoreInt64(&s.isServing, 1) + return nil +} + + +// AddCloseCallback adds a callback in the Close phase. +func (s *Master) AddCloseCallback(callbacks ...func()) { + s.closeCallbacks = append(s.closeCallbacks, callbacks...) +} + + +// Close closes the server. +func (s *Master) Close() { + if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) { + // server is already closed + return + } + + log.Print("closing server") + + s.stopServerLoop() + + if s.kvBase != nil { + s.kvBase.Close() + } + + // Run callbacks + for _, cb := range s.closeCallbacks { + cb() + } + + log.Print("close server") +} + +// IsClosed checks whether server is closed or not. +func (s *Master) IsClosed() bool { + return atomic.LoadInt64(&s.isServing) == 0 +} + + +// Run runs the pd server. +func (s *Master) Run() error { + + if err := s.startServer(s.ctx); err != nil { + return err + } + + s.startServerLoop(s.ctx) + + return nil +} + +// Context returns the context of server. +func (s *Master) Context() context.Context { + return s.ctx +} + +// LoopContext returns the loop context of server. +func (s *Master) LoopContext() context.Context { + return s.serverLoopCtx +} + +func (s *Master) startServerLoop(ctx context.Context) { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx) + s.serverLoopWg.Add(3) + //go s.Se + go s.grpcLoop() + go s.pulsarLoop() + go s.segmentStatisticsLoop() +} + +func (s *Master) stopServerLoop() { + s.serverLoopCancel() + s.serverLoopWg.Wait() +} + + +// StartTimestamp returns the start timestamp of this server +func (s *Master) StartTimestamp() int64 { + return s.startTimestamp +} + + +func (s *Master) grpcLoop() { + defer s.serverLoopWg.Done() + + defaultGRPCPort := ":" + defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) + lis, err := net.Listen("tcp", defaultGRPCPort) + if err != nil { + log.Println("failed to listen: %v", err) + return + } + + if err := s.grpcServer.Serve(lis); err != nil { + } + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + for { + select { + case <-ctx.Done(): + log.Print("server is closed, exit etcd leader loop") + return + } + } +} + +// todo use messagestream +func (s *Master) pulsarLoop() { + defer s.serverLoopWg.Done() + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + + consumer, err := s.pc.Client.Subscribe(pulsar.ConsumerOptions{ + Topic: conf.Config.Master.PulsarTopic, + SubscriptionName: "my-sub", + Type: pulsar.Shared, + }) + if err != nil { + log.Fatal(err) + return + } + defer func (){ + if err := consumer.Unsubscribe(); err != nil { + log.Fatal(err) + } + cancel() + }() + + consumerChan := consumer.Chan() + + for { + select { + case msg := <-consumerChan: + var m internalpb.SegmentStatistics + proto.Unmarshal(msg.Payload(), &m) + fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + msg.ID(), m.SegmentId) + s.ssChan <- m + consumer.Ack(msg) + case <-ctx.Done(): + log.Print("server is closed, exit etcd leader loop") + return + } + } + +} + +func (s *Master) segmentStatisticsLoop() { + defer s.serverLoopWg.Done() + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + + for { + select { + case ss := <-s.ssChan: + controller.ComputeCloseTime(ss, s.kvBase) + case <-ctx.Done(): + log.Print("server is closed, exit etcd leader loop") + return + } + } +} + diff --git a/internal/master/segment/stats.go b/internal/master/segment/stats.go deleted file mode 100644 index 9d97f57d82..0000000000 --- a/internal/master/segment/stats.go +++ /dev/null @@ -1,49 +0,0 @@ -package segment - -import ( - "context" - "fmt" - "log" - "github.com/golang/protobuf/proto" - "github.com/apache/pulsar-client-go/pulsar" - "github.com/zilliztech/milvus-distributed/internal/conf" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/master/informer" -) - -type SegmentStats struct { - SegementID uint64 - MemorySize uint64 - MemoryRate float64 - Rows int64 -} - - -func Listener(ssChan chan internalpb.SegmentStatistics, pc informer.PulsarClient) error { - consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{ - Topic: conf.Config.Master.PulsarTopic, - SubscriptionName: "my-sub", - Type: pulsar.Shared, - }) - if err != nil { - log.Fatal(err) - } - for { - msg, err := consumer.Receive(context.TODO()) - if err != nil { - log.Fatal(err) - } - - var m internalpb.SegmentStatistics - proto.Unmarshal(msg.Payload(), &m) - fmt.Printf("Received message msgId: %#v -- content: '%s'\n", - msg.ID(), m.SegmentId) - ssChan <- m - consumer.Ack(msg) - } - - if err := consumer.Unsubscribe(); err != nil { - log.Fatal(err) - } - return nil -} diff --git a/internal/master/server.go b/internal/master/server.go deleted file mode 100644 index d8f79bf4f5..0000000000 --- a/internal/master/server.go +++ /dev/null @@ -1,46 +0,0 @@ -package master - -import ( - "log" - "strconv" - "time" - - "github.com/zilliztech/milvus-distributed/internal/conf" - "github.com/zilliztech/milvus-distributed/internal/master/controller" - "github.com/zilliztech/milvus-distributed/internal/master/kv" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - - "go.etcd.io/etcd/clientv3" -) - -func Run() { - kvBase := newKvBase() - collectionChan := make(chan *schemapb.CollectionSchema) - defer close(collectionChan) - - errorCh := make(chan error) - defer close(errorCh) - - go Server(collectionChan, errorCh, kvBase) - go controller.SegmentStatsController(kvBase, errorCh) - go controller.CollectionController(collectionChan, kvBase, errorCh) - //go timetick.TimeTickService() - for { - for v := range errorCh { - log.Fatal(v) - } - } -} - -func newKvBase() kv.Base { - etcdAddr := conf.Config.Etcd.Address - etcdAddr += ":" - etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) - cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, - DialTimeout: 5 * time.Second, - }) - // defer cli.Close() - kvBase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - return kvBase -} diff --git a/internal/master/tso/global_allocator.go b/internal/master/tso/global_allocator.go new file mode 100644 index 0000000000..dbf05f0601 --- /dev/null +++ b/internal/master/tso/global_allocator.go @@ -0,0 +1,115 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "go.etcd.io/etcd/clientv3" + "sync/atomic" + "time" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// Allocator is a Timestamp Oracle allocator. +type Allocator interface { + // Initialize is used to initialize a TSO allocator. + // It will synchronize TSO with etcd and initialize the + // memory for later allocation work. + Initialize() error + // UpdateTSO is used to update the TSO in memory and the time window in etcd. + UpdateTSO() error + // SetTSO sets the physical part with given tso. It's mainly used for BR restore + // and can not forcibly set the TSO smaller than now. + SetTSO(tso uint64) error + // GenerateTSO is used to generate a given number of TSOs. + // Make sure you have initialized the TSO allocator before calling. + GenerateTSO(count uint32) (internalpb.TimestampMsg, error) + // Reset is used to reset the TSO allocator. + Reset() +} + +// GlobalTSOAllocator is the global single point TSO allocator. +type GlobalTSOAllocator struct { + timestampOracle *timestampOracle +} + +// NewGlobalTSOAllocator creates a new global TSO allocator. +func NewGlobalTSOAllocator(client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) Allocator { + return &GlobalTSOAllocator{ + timestampOracle: ×tampOracle{ + client: client, + rootPath: rootPath, + saveInterval: saveInterval, + maxResetTSGap: maxResetTSGap, + }, + } +} + +// Initialize will initialize the created global TSO allocator. +func (gta *GlobalTSOAllocator) Initialize() error { + return gta.timestampOracle.SyncTimestamp() +} + +// UpdateTSO is used to update the TSO in memory and the time window in etcd. +func (gta *GlobalTSOAllocator) UpdateTSO() error { + return gta.timestampOracle.UpdateTimestamp() +} + +// SetTSO sets the physical part with given tso. +func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error { + return gta.timestampOracle.ResetUserTimestamp(tso) +} + +// GenerateTSO is used to generate a given number of TSOs. +// Make sure you have initialized the TSO allocator before calling. +func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (internalpb.TimestampMsg, error) { + var resp internalpb.TimestampMsg + + if count == 0 { + return resp, errors.New("tso count should be positive") + } + + maxRetryCount := 10 + + for i := 0; i < maxRetryCount; i++ { + current := (*atomicObject)(atomic.LoadPointer(>a.timestampOracle.TSO)) + if current == nil || current.physical == typeutil.ZeroTime { + // If it's leader, maybe SyncTimestamp hasn't completed yet + log.Info("sync hasn't completed yet, wait for a while") + time.Sleep(200 * time.Millisecond) + continue + } + + resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) + resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) + if resp.Logical >= maxLogical { + log.Error("logical part outside of max logical interval, please check ntp time", + zap.Reflect("response", resp), + zap.Int("retry-count", i)) + time.Sleep(UpdateTimestampStep) + continue + } + return resp, nil + } + return resp, errors.New("can not get timestamp") +} + +// Reset is used to reset the TSO allocator. +func (gta *GlobalTSOAllocator) Reset() { + gta.timestampOracle.ResetTimestamp() +} diff --git a/internal/master/tso/tso.go b/internal/master/tso/tso.go new file mode 100644 index 0000000000..4211ee9357 --- /dev/null +++ b/internal/master/tso/tso.go @@ -0,0 +1,217 @@ +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "go.uber.org/zap" + "log" + "path" + "sync/atomic" + "time" + "unsafe" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/util/etcdutil" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" +) + +const ( + // UpdateTimestampStep is used to update timestamp. + UpdateTimestampStep = 50 * time.Millisecond + // updateTimestampGuard is the min timestamp interval. + updateTimestampGuard = time.Millisecond + // maxLogical is the max upper limit for logical time. + // When a TSO's logical time reaches this limit, + // the physical time will be forced to increase. + maxLogical = int64(1 << 18) +) + +// atomicObject is used to store the current TSO in memory. +type atomicObject struct { + physical time.Time + logical int64 +} + +// timestampOracle is used to maintain the logic of tso. +type timestampOracle struct { + client *clientv3.Client + rootPath string + // TODO: remove saveInterval + saveInterval time.Duration + maxResetTSGap func() time.Duration + // For tso, set after the PD becomes a leader. + TSO unsafe.Pointer + lastSavedTime atomic.Value +} + +func (t *timestampOracle) getTimestampPath() string { + return path.Join(t.rootPath, "timestamp") +} + +func (t *timestampOracle) loadTimestamp() (time.Time, error) { + data, err := etcdutil.GetValue(t.client, t.getTimestampPath()) + if err != nil { + return typeutil.ZeroTime, err + } + if len(data) == 0 { + return typeutil.ZeroTime, nil + } + return typeutil.ParseTimestamp(data) +} + +// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, +// otherwise, update it. +func (t *timestampOracle) saveTimestamp( ts time.Time) error { + key := t.getTimestampPath() + data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) + err := errors.New("") + println("%v,%v",key, data) + //resp, err := leadership.LeaderTxn(). + // Then(clientv3.OpPut(key, string(data))). + // Commit() + if err != nil { + return errors.WithStack(err) + } + //if !resp.Succeeded { + // return errors.New("save timestamp failed, maybe we lost leader") + //} + + t.lastSavedTime.Store(ts) + + return nil +} + +// SyncTimestamp is used to synchronize the timestamp. +func (t *timestampOracle) SyncTimestamp() error { + + last, err := t.loadTimestamp() + if err != nil { + return err + } + + next := time.Now() + + // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, + // the timestamp allocation will start from the saved etcd timestamp temporarily. + if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { + next = last.Add(updateTimestampGuard) + } + + save := next.Add(t.saveInterval) + if err = t.saveTimestamp(save); err != nil { + return err + } + + log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) + + current := &atomicObject{ + physical: next, + } + atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) + + return nil +} + +// ResetUserTimestamp update the physical part with specified tso. +func (t *timestampOracle) ResetUserTimestamp( tso uint64) error { + //if !leadership.Check() { + // return errors.New("Setup timestamp failed, lease expired") + //} + physical, _ := tsoutil.ParseTS(tso) + next := physical.Add(time.Millisecond) + prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) + + // do not update + if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard { + return errors.New("the specified ts too small than now") + } + + if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() { + return errors.New("the specified ts too large than now") + } + + save := next.Add(t.saveInterval) + if err := t.saveTimestamp( save); err != nil { + return err + } + update := &atomicObject{ + physical: next, + } + atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update)) + return nil +} + +// UpdateTimestamp is used to update the timestamp. +// This function will do two things: +// 1. When the logical time is going to be used up, increase the current physical time. +// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time +// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and +// we also need to save the next physical time plus `TsoSaveInterval` into etcd. +// +// Here is some constraints that this function must satisfy: +// 1. The saved time is monotonically increasing. +// 2. The physical time is monotonically increasing. +// 3. The physical time is always less than the saved timestamp. +func (t *timestampOracle) UpdateTimestamp() error { + prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) + now := time.Now() + + jetLag := typeutil.SubTimeByWallClock(now, prev.physical) + if jetLag > 3*UpdateTimestampStep { + log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) + } + + var next time.Time + prevLogical := atomic.LoadInt64(&prev.logical) + // If the system time is greater, it will be synchronized with the system time. + if jetLag > updateTimestampGuard { + next = now + } else if prevLogical > maxLogical/2 { + // The reason choosing maxLogical/2 here is that it's big enough for common cases. + // Because there is enough timestamp can be allocated before next update. + log.Print("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) + next = prev.physical.Add(time.Millisecond) + } else { + // It will still use the previous physical time to alloc the timestamp. + return nil + } + + // It is not safe to increase the physical time to `next`. + // The time window needs to be updated and saved to etcd. + if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard { + save := next.Add(t.saveInterval) + if err := t.saveTimestamp( save); err != nil { + return err + } + } + + current := &atomicObject{ + physical: next, + logical: 0, + } + + atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) + + return nil +} + +// ResetTimestamp is used to reset the timestamp. +func (t *timestampOracle) ResetTimestamp() { + zero := &atomicObject{ + physical: typeutil.ZeroTime, + } + atomic.StorePointer(&t.TSO, unsafe.Pointer(zero)) +} diff --git a/internal/proto/internal_msg.proto b/internal/proto/internal_msg.proto index 2c626a8df7..988cac7b08 100644 --- a/internal/proto/internal_msg.proto +++ b/internal/proto/internal_msg.proto @@ -29,6 +29,35 @@ enum ReqType { kSearch = 500; } +enum PeerRole { + + Master = 0; + + Reader = 1; + + Writer = 2; + + Proxy = 3; + +} + +message TimestampMsg { + int64 physical = 1; + int64 logical = 2; +} + +message TsoRequest { + int64 peer_id = 1; + PeerRole role = 2; + uint32 count = 3; +} + +message TsoResponse { + common.Status status = 1; + TimestampMsg timestamp = 2; + uint32 count = 3; +} + message CreateCollectionRequest { ReqType req_type = 1; @@ -190,4 +219,4 @@ message SegmentStatistics { uint64 segment_id = 1; uint64 memory_size = 2; int64 num_rows = 3; -} \ No newline at end of file +} diff --git a/internal/proto/internalpb/internal_msg.pb.go b/internal/proto/internalpb/internal_msg.pb.go index 01cdd8ff1a..3879de7561 100644 --- a/internal/proto/internalpb/internal_msg.pb.go +++ b/internal/proto/internalpb/internal_msg.pb.go @@ -84,6 +84,194 @@ func (ReqType) EnumDescriptor() ([]byte, []int) { return fileDescriptor_7eb37f6b80b23116, []int{0} } +type PeerRole int32 + +const ( + PeerRole_Master PeerRole = 0 + PeerRole_Reader PeerRole = 1 + PeerRole_Writer PeerRole = 2 + PeerRole_Proxy PeerRole = 3 +) + +var PeerRole_name = map[int32]string{ + 0: "Master", + 1: "Reader", + 2: "Writer", + 3: "Proxy", +} + +var PeerRole_value = map[string]int32{ + "Master": 0, + "Reader": 1, + "Writer": 2, + "Proxy": 3, +} + +func (x PeerRole) String() string { + return proto.EnumName(PeerRole_name, int32(x)) +} + +func (PeerRole) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_7eb37f6b80b23116, []int{1} +} + +type TimestampMsg struct { + Physical int64 `protobuf:"varint,1,opt,name=physical,proto3" json:"physical,omitempty"` + Logical int64 `protobuf:"varint,2,opt,name=logical,proto3" json:"logical,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TimestampMsg) Reset() { *m = TimestampMsg{} } +func (m *TimestampMsg) String() string { return proto.CompactTextString(m) } +func (*TimestampMsg) ProtoMessage() {} +func (*TimestampMsg) Descriptor() ([]byte, []int) { + return fileDescriptor_7eb37f6b80b23116, []int{0} +} + +func (m *TimestampMsg) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TimestampMsg.Unmarshal(m, b) +} +func (m *TimestampMsg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TimestampMsg.Marshal(b, m, deterministic) +} +func (m *TimestampMsg) XXX_Merge(src proto.Message) { + xxx_messageInfo_TimestampMsg.Merge(m, src) +} +func (m *TimestampMsg) XXX_Size() int { + return xxx_messageInfo_TimestampMsg.Size(m) +} +func (m *TimestampMsg) XXX_DiscardUnknown() { + xxx_messageInfo_TimestampMsg.DiscardUnknown(m) +} + +var xxx_messageInfo_TimestampMsg proto.InternalMessageInfo + +func (m *TimestampMsg) GetPhysical() int64 { + if m != nil { + return m.Physical + } + return 0 +} + +func (m *TimestampMsg) GetLogical() int64 { + if m != nil { + return m.Logical + } + return 0 +} + +type TsoRequest struct { + PeerId int64 `protobuf:"varint,1,opt,name=peer_id,json=peerId,proto3" json:"peer_id,omitempty"` + Role PeerRole `protobuf:"varint,2,opt,name=role,proto3,enum=milvus.proto.internal.PeerRole" json:"role,omitempty"` + Count uint32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TsoRequest) Reset() { *m = TsoRequest{} } +func (m *TsoRequest) String() string { return proto.CompactTextString(m) } +func (*TsoRequest) ProtoMessage() {} +func (*TsoRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_7eb37f6b80b23116, []int{1} +} + +func (m *TsoRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TsoRequest.Unmarshal(m, b) +} +func (m *TsoRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TsoRequest.Marshal(b, m, deterministic) +} +func (m *TsoRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TsoRequest.Merge(m, src) +} +func (m *TsoRequest) XXX_Size() int { + return xxx_messageInfo_TsoRequest.Size(m) +} +func (m *TsoRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TsoRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TsoRequest proto.InternalMessageInfo + +func (m *TsoRequest) GetPeerId() int64 { + if m != nil { + return m.PeerId + } + return 0 +} + +func (m *TsoRequest) GetRole() PeerRole { + if m != nil { + return m.Role + } + return PeerRole_Master +} + +func (m *TsoRequest) GetCount() uint32 { + if m != nil { + return m.Count + } + return 0 +} + +type TsoResponse struct { + Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + Timestamp *TimestampMsg `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Count uint32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TsoResponse) Reset() { *m = TsoResponse{} } +func (m *TsoResponse) String() string { return proto.CompactTextString(m) } +func (*TsoResponse) ProtoMessage() {} +func (*TsoResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_7eb37f6b80b23116, []int{2} +} + +func (m *TsoResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TsoResponse.Unmarshal(m, b) +} +func (m *TsoResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TsoResponse.Marshal(b, m, deterministic) +} +func (m *TsoResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TsoResponse.Merge(m, src) +} +func (m *TsoResponse) XXX_Size() int { + return xxx_messageInfo_TsoResponse.Size(m) +} +func (m *TsoResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TsoResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TsoResponse proto.InternalMessageInfo + +func (m *TsoResponse) GetStatus() *commonpb.Status { + if m != nil { + return m.Status + } + return nil +} + +func (m *TsoResponse) GetTimestamp() *TimestampMsg { + if m != nil { + return m.Timestamp + } + return nil +} + +func (m *TsoResponse) GetCount() uint32 { + if m != nil { + return m.Count + } + return 0 +} + type CreateCollectionRequest struct { ReqType ReqType `protobuf:"varint,1,opt,name=req_type,json=reqType,proto3,enum=milvus.proto.internal.ReqType" json:"req_type,omitempty"` ReqId uint64 `protobuf:"varint,2,opt,name=req_id,json=reqId,proto3" json:"req_id,omitempty"` @@ -99,7 +287,7 @@ func (m *CreateCollectionRequest) Reset() { *m = CreateCollectionRequest func (m *CreateCollectionRequest) String() string { return proto.CompactTextString(m) } func (*CreateCollectionRequest) ProtoMessage() {} func (*CreateCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{0} + return fileDescriptor_7eb37f6b80b23116, []int{3} } func (m *CreateCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -170,7 +358,7 @@ func (m *DropCollectionRequest) Reset() { *m = DropCollectionRequest{} } func (m *DropCollectionRequest) String() string { return proto.CompactTextString(m) } func (*DropCollectionRequest) ProtoMessage() {} func (*DropCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{1} + return fileDescriptor_7eb37f6b80b23116, []int{4} } func (m *DropCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -241,7 +429,7 @@ func (m *HasCollectionRequest) Reset() { *m = HasCollectionRequest{} } func (m *HasCollectionRequest) String() string { return proto.CompactTextString(m) } func (*HasCollectionRequest) ProtoMessage() {} func (*HasCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{2} + return fileDescriptor_7eb37f6b80b23116, []int{5} } func (m *HasCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -312,7 +500,7 @@ func (m *DescribeCollectionRequest) Reset() { *m = DescribeCollectionReq func (m *DescribeCollectionRequest) String() string { return proto.CompactTextString(m) } func (*DescribeCollectionRequest) ProtoMessage() {} func (*DescribeCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{3} + return fileDescriptor_7eb37f6b80b23116, []int{6} } func (m *DescribeCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -382,7 +570,7 @@ func (m *ShowCollectionRequest) Reset() { *m = ShowCollectionRequest{} } func (m *ShowCollectionRequest) String() string { return proto.CompactTextString(m) } func (*ShowCollectionRequest) ProtoMessage() {} func (*ShowCollectionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{4} + return fileDescriptor_7eb37f6b80b23116, []int{7} } func (m *ShowCollectionRequest) XXX_Unmarshal(b []byte) error { @@ -446,7 +634,7 @@ func (m *CreatePartitionRequest) Reset() { *m = CreatePartitionRequest{} func (m *CreatePartitionRequest) String() string { return proto.CompactTextString(m) } func (*CreatePartitionRequest) ProtoMessage() {} func (*CreatePartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{5} + return fileDescriptor_7eb37f6b80b23116, []int{8} } func (m *CreatePartitionRequest) XXX_Unmarshal(b []byte) error { @@ -517,7 +705,7 @@ func (m *DropPartitionRequest) Reset() { *m = DropPartitionRequest{} } func (m *DropPartitionRequest) String() string { return proto.CompactTextString(m) } func (*DropPartitionRequest) ProtoMessage() {} func (*DropPartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{6} + return fileDescriptor_7eb37f6b80b23116, []int{9} } func (m *DropPartitionRequest) XXX_Unmarshal(b []byte) error { @@ -588,7 +776,7 @@ func (m *HasPartitionRequest) Reset() { *m = HasPartitionRequest{} } func (m *HasPartitionRequest) String() string { return proto.CompactTextString(m) } func (*HasPartitionRequest) ProtoMessage() {} func (*HasPartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{7} + return fileDescriptor_7eb37f6b80b23116, []int{10} } func (m *HasPartitionRequest) XXX_Unmarshal(b []byte) error { @@ -659,7 +847,7 @@ func (m *DescribePartitionRequest) Reset() { *m = DescribePartitionReque func (m *DescribePartitionRequest) String() string { return proto.CompactTextString(m) } func (*DescribePartitionRequest) ProtoMessage() {} func (*DescribePartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{8} + return fileDescriptor_7eb37f6b80b23116, []int{11} } func (m *DescribePartitionRequest) XXX_Unmarshal(b []byte) error { @@ -730,7 +918,7 @@ func (m *ShowPartitionRequest) Reset() { *m = ShowPartitionRequest{} } func (m *ShowPartitionRequest) String() string { return proto.CompactTextString(m) } func (*ShowPartitionRequest) ProtoMessage() {} func (*ShowPartitionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{9} + return fileDescriptor_7eb37f6b80b23116, []int{12} } func (m *ShowPartitionRequest) XXX_Unmarshal(b []byte) error { @@ -806,7 +994,7 @@ func (m *InsertRequest) Reset() { *m = InsertRequest{} } func (m *InsertRequest) String() string { return proto.CompactTextString(m) } func (*InsertRequest) ProtoMessage() {} func (*InsertRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{10} + return fileDescriptor_7eb37f6b80b23116, []int{13} } func (m *InsertRequest) XXX_Unmarshal(b []byte) error { @@ -914,7 +1102,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } func (*DeleteRequest) ProtoMessage() {} func (*DeleteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{11} + return fileDescriptor_7eb37f6b80b23116, []int{14} } func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { @@ -1000,7 +1188,7 @@ func (m *SearchRequest) Reset() { *m = SearchRequest{} } func (m *SearchRequest) String() string { return proto.CompactTextString(m) } func (*SearchRequest) ProtoMessage() {} func (*SearchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{12} + return fileDescriptor_7eb37f6b80b23116, []int{15} } func (m *SearchRequest) XXX_Unmarshal(b []byte) error { @@ -1080,7 +1268,7 @@ func (m *SearchResult) Reset() { *m = SearchResult{} } func (m *SearchResult) String() string { return proto.CompactTextString(m) } func (*SearchResult) ProtoMessage() {} func (*SearchResult) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{13} + return fileDescriptor_7eb37f6b80b23116, []int{16} } func (m *SearchResult) XXX_Unmarshal(b []byte) error { @@ -1162,7 +1350,7 @@ func (m *TimeSyncMsg) Reset() { *m = TimeSyncMsg{} } func (m *TimeSyncMsg) String() string { return proto.CompactTextString(m) } func (*TimeSyncMsg) ProtoMessage() {} func (*TimeSyncMsg) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{14} + return fileDescriptor_7eb37f6b80b23116, []int{17} } func (m *TimeSyncMsg) XXX_Unmarshal(b []byte) error { @@ -1212,7 +1400,7 @@ func (m *Key2Seg) Reset() { *m = Key2Seg{} } func (m *Key2Seg) String() string { return proto.CompactTextString(m) } func (*Key2Seg) ProtoMessage() {} func (*Key2Seg) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{15} + return fileDescriptor_7eb37f6b80b23116, []int{18} } func (m *Key2Seg) XXX_Unmarshal(b []byte) error { @@ -1280,7 +1468,7 @@ func (m *Key2SegMsg) Reset() { *m = Key2SegMsg{} } func (m *Key2SegMsg) String() string { return proto.CompactTextString(m) } func (*Key2SegMsg) ProtoMessage() {} func (*Key2SegMsg) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{16} + return fileDescriptor_7eb37f6b80b23116, []int{19} } func (m *Key2SegMsg) XXX_Unmarshal(b []byte) error { @@ -1328,7 +1516,7 @@ func (m *SegmentStatistics) Reset() { *m = SegmentStatistics{} } func (m *SegmentStatistics) String() string { return proto.CompactTextString(m) } func (*SegmentStatistics) ProtoMessage() {} func (*SegmentStatistics) Descriptor() ([]byte, []int) { - return fileDescriptor_7eb37f6b80b23116, []int{17} + return fileDescriptor_7eb37f6b80b23116, []int{20} } func (m *SegmentStatistics) XXX_Unmarshal(b []byte) error { @@ -1372,6 +1560,10 @@ func (m *SegmentStatistics) GetNumRows() int64 { func init() { proto.RegisterEnum("milvus.proto.internal.ReqType", ReqType_name, ReqType_value) + proto.RegisterEnum("milvus.proto.internal.PeerRole", PeerRole_name, PeerRole_value) + proto.RegisterType((*TimestampMsg)(nil), "milvus.proto.internal.TimestampMsg") + proto.RegisterType((*TsoRequest)(nil), "milvus.proto.internal.TsoRequest") + proto.RegisterType((*TsoResponse)(nil), "milvus.proto.internal.TsoResponse") proto.RegisterType((*CreateCollectionRequest)(nil), "milvus.proto.internal.CreateCollectionRequest") proto.RegisterType((*DropCollectionRequest)(nil), "milvus.proto.internal.DropCollectionRequest") proto.RegisterType((*HasCollectionRequest)(nil), "milvus.proto.internal.HasCollectionRequest") @@ -1395,69 +1587,78 @@ func init() { func init() { proto.RegisterFile("internal_msg.proto", fileDescriptor_7eb37f6b80b23116) } var fileDescriptor_7eb37f6b80b23116 = []byte{ - // 1020 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x57, 0xcd, 0x6f, 0xe3, 0x44, - 0x14, 0x67, 0x92, 0x26, 0x69, 0x5f, 0x9a, 0xd6, 0x9d, 0x36, 0xd4, 0x5b, 0x3e, 0x36, 0x78, 0x91, - 0xa8, 0x56, 0x22, 0x15, 0x59, 0x0e, 0x70, 0x6d, 0x73, 0x68, 0x58, 0x6d, 0x85, 0x9c, 0x8a, 0x03, - 0x12, 0xb2, 0x1c, 0xfb, 0x91, 0x8c, 0xfc, 0xd9, 0x99, 0x49, 0x8b, 0x7b, 0xe7, 0x8a, 0x90, 0x38, - 0x72, 0xe3, 0xaf, 0xe1, 0xeb, 0xce, 0x3f, 0x01, 0x82, 0x95, 0x40, 0x5c, 0xd1, 0xd8, 0xce, 0x87, - 0xdd, 0x52, 0x10, 0x62, 0xa5, 0x4a, 0xb9, 0xe5, 0xbd, 0x99, 0xbc, 0xf9, 0x7d, 0xcc, 0x3c, 0xcf, - 0x00, 0x65, 0xa1, 0x44, 0x1e, 0xda, 0xbe, 0x15, 0x88, 0x71, 0x37, 0xe6, 0x91, 0x8c, 0x68, 0x3b, - 0x60, 0xfe, 0xe5, 0x54, 0x64, 0x51, 0x77, 0x36, 0xe1, 0x60, 0xd3, 0x89, 0x82, 0x20, 0x0a, 0xb3, - 0xf4, 0xc1, 0x8e, 0x40, 0x7e, 0xc9, 0x1c, 0x5c, 0xfc, 0xcf, 0xf8, 0x89, 0xc0, 0xfe, 0x09, 0x47, - 0x5b, 0xe2, 0x49, 0xe4, 0xfb, 0xe8, 0x48, 0x16, 0x85, 0x26, 0x5e, 0x4c, 0x51, 0x48, 0xfa, 0x3e, - 0xac, 0x73, 0xbc, 0xb0, 0x64, 0x12, 0xa3, 0x4e, 0x3a, 0xe4, 0x70, 0xab, 0xf7, 0x7a, 0xf7, 0xd6, - 0x65, 0xba, 0x26, 0x5e, 0x9c, 0x27, 0x31, 0x9a, 0x0d, 0x9e, 0xfd, 0xa0, 0x6d, 0xa8, 0xab, 0xbf, - 0x32, 0x57, 0xaf, 0x74, 0xc8, 0xe1, 0x9a, 0x59, 0xe3, 0x78, 0x31, 0x70, 0xe9, 0xab, 0xb0, 0x21, - 0x59, 0x80, 0x42, 0xda, 0x41, 0xac, 0x57, 0xd3, 0x91, 0x45, 0x82, 0x3e, 0x80, 0xf5, 0x98, 0x47, - 0x9f, 0x25, 0xea, 0x6f, 0x6b, 0x1d, 0x72, 0x58, 0x35, 0x1b, 0x69, 0x3c, 0x70, 0xe9, 0x3b, 0x50, - 0x17, 0xce, 0x04, 0x03, 0x5b, 0xaf, 0x75, 0xc8, 0x61, 0xb3, 0xf7, 0xa0, 0x08, 0x24, 0x67, 0x79, - 0xec, 0x47, 0x23, 0x33, 0x9f, 0x68, 0x3c, 0x27, 0xd0, 0xee, 0xf3, 0x28, 0xbe, 0xd7, 0xbc, 0x9e, - 0xc1, 0xb6, 0x33, 0xc7, 0x67, 0x85, 0x76, 0x80, 0x39, 0xc1, 0x37, 0x8b, 0x88, 0x72, 0xe3, 0xba, - 0x0b, 0x32, 0x67, 0x76, 0x80, 0xe6, 0x96, 0x53, 0x88, 0x8d, 0xdf, 0x08, 0xec, 0x9d, 0xda, 0x62, - 0x95, 0x28, 0xff, 0x41, 0xe0, 0x41, 0x1f, 0x85, 0xc3, 0xd9, 0x08, 0x57, 0x89, 0xf7, 0x37, 0x04, - 0xda, 0xc3, 0x49, 0x74, 0x75, 0x9f, 0x39, 0x1b, 0xbf, 0x12, 0x78, 0x39, 0xeb, 0x2e, 0x1f, 0xda, - 0x5c, 0xb2, 0x7b, 0xea, 0xcc, 0x07, 0xb0, 0x15, 0xcf, 0xe0, 0x2d, 0x1b, 0xf3, 0xe8, 0x76, 0x63, - 0xe6, 0x54, 0x52, 0x5f, 0x5a, 0xf1, 0x72, 0x68, 0xfc, 0x42, 0x60, 0x4f, 0x75, 0x9d, 0x55, 0xe1, - 0xfb, 0x33, 0x81, 0xdd, 0x53, 0x5b, 0xac, 0x0a, 0xdd, 0xe7, 0x04, 0xf4, 0x59, 0xb7, 0x59, 0x15, - 0xce, 0xea, 0xa3, 0xa2, 0x3a, 0xcd, 0x7d, 0xe6, 0xfb, 0x7f, 0x7f, 0x54, 0x2a, 0xd0, 0x1a, 0x84, - 0x02, 0xb9, 0x7c, 0x71, 0x5c, 0xdf, 0xba, 0x09, 0x59, 0x31, 0xde, 0x28, 0x83, 0xa1, 0x8f, 0x60, - 0x61, 0x88, 0x25, 0xed, 0x71, 0xca, 0x7d, 0xc3, 0xdc, 0x9c, 0x27, 0xcf, 0xed, 0x31, 0x7d, 0x0d, - 0x40, 0xe0, 0x38, 0xc0, 0x50, 0xaa, 0x85, 0x6a, 0x99, 0x74, 0x79, 0x66, 0xe0, 0xaa, 0x61, 0x67, - 0x62, 0x87, 0x21, 0xfa, 0x6a, 0xb8, 0x9e, 0x0d, 0xe7, 0x99, 0x81, 0x5b, 0x50, 0xb6, 0x51, 0x54, - 0xb6, 0x60, 0xc9, 0x7a, 0xd9, 0x92, 0x7d, 0x68, 0xf0, 0xe8, 0xca, 0x62, 0xae, 0xd0, 0x37, 0x3a, - 0xd5, 0xc3, 0xaa, 0x59, 0xe7, 0xd1, 0xd5, 0xc0, 0x15, 0xf4, 0x5d, 0x58, 0x57, 0x03, 0xae, 0x2d, - 0x6d, 0x1d, 0x3a, 0xd5, 0xbb, 0xaf, 0x6c, 0xaa, 0x46, 0xdf, 0x96, 0xb6, 0xf1, 0x79, 0x05, 0x5a, - 0x7d, 0xf4, 0x51, 0xe2, 0x3d, 0xd0, 0xbd, 0xa8, 0xd9, 0xda, 0x5d, 0x9a, 0xd5, 0xee, 0xd0, 0xac, - 0x5e, 0xd6, 0xec, 0x0d, 0xd8, 0x8c, 0x39, 0x0b, 0x6c, 0x9e, 0x58, 0x1e, 0x26, 0x42, 0x6f, 0xa4, - 0xc2, 0x35, 0xf3, 0xdc, 0x53, 0x4c, 0x84, 0xf1, 0x27, 0x81, 0xd6, 0x10, 0x6d, 0xee, 0x4c, 0x5e, - 0x9c, 0x0e, 0xcb, 0xf8, 0xab, 0x77, 0xe0, 0x5f, 0x2b, 0xe3, 0x7f, 0x0c, 0x3b, 0x1c, 0xc5, 0xd4, - 0x97, 0xd6, 0x92, 0x3c, 0xd9, 0x8e, 0xdb, 0xce, 0x06, 0x4e, 0xe6, 0x22, 0x1d, 0x41, 0xed, 0x62, - 0x8a, 0x3c, 0x49, 0x55, 0xb8, 0x73, 0x0f, 0x64, 0xf3, 0x8c, 0xaf, 0x2a, 0xb0, 0x39, 0x63, 0xae, - 0x4a, 0xd1, 0x27, 0x50, 0x17, 0xd2, 0x96, 0x53, 0x91, 0xd2, 0x6e, 0xf6, 0x5e, 0xb9, 0xb5, 0xc4, - 0x30, 0x9d, 0x62, 0xe6, 0x53, 0xff, 0x03, 0x65, 0x03, 0x5a, 0x29, 0x00, 0x2b, 0x8c, 0x5c, 0x5c, - 0x34, 0x98, 0x66, 0x9a, 0x3c, 0x8b, 0x5c, 0x2c, 0xcb, 0x52, 0xfb, 0x57, 0xb2, 0xd4, 0x6f, 0x97, - 0xa5, 0x0b, 0x6b, 0x13, 0x26, 0x33, 0xeb, 0x9b, 0xbd, 0x83, 0xdb, 0x7b, 0xd4, 0x29, 0x93, 0xc2, - 0x4c, 0xe7, 0x19, 0x7d, 0x68, 0x9e, 0xb3, 0x00, 0x87, 0x49, 0xe8, 0x3c, 0x13, 0x63, 0x75, 0xea, - 0x62, 0x44, 0xae, 0x16, 0x20, 0x29, 0xcc, 0xba, 0x0a, 0xcb, 0x08, 0x2b, 0x25, 0x84, 0xc6, 0xd7, - 0x04, 0x1a, 0x4f, 0x31, 0xe9, 0x0d, 0x71, 0x9c, 0x2a, 0x94, 0x1e, 0xdc, 0xbc, 0x42, 0x2d, 0x3d, - 0xb7, 0xf4, 0x21, 0x34, 0x97, 0xf6, 0x66, 0x5e, 0x02, 0x16, 0x5b, 0xf3, 0x9f, 0x3b, 0x34, 0x13, - 0xd6, 0xa5, 0xed, 0xe7, 0x02, 0xae, 0x9b, 0x0d, 0x26, 0x3e, 0x52, 0xa1, 0xaa, 0xbc, 0x68, 0x50, - 0x42, 0xaf, 0x75, 0xaa, 0xaa, 0xf2, 0xbc, 0x43, 0x09, 0xe3, 0x13, 0x80, 0x1c, 0x9c, 0xa2, 0xb8, - 0x70, 0x90, 0x2c, 0x3b, 0xf8, 0x1e, 0x34, 0x3c, 0x4c, 0x7a, 0x02, 0xc7, 0x7a, 0x25, 0xd5, 0xee, - 0xef, 0x4e, 0x41, 0x5e, 0xca, 0x9c, 0x4d, 0x37, 0x42, 0xd8, 0x19, 0x66, 0x8b, 0xa9, 0xbd, 0xc2, - 0x84, 0x64, 0x8e, 0x28, 0x75, 0x4d, 0x52, 0xee, 0x9a, 0x0f, 0xa1, 0x19, 0x60, 0x10, 0xf1, 0xc4, - 0x12, 0xec, 0x1a, 0x67, 0x6a, 0x64, 0xa9, 0x21, 0xbb, 0x46, 0xc5, 0x37, 0x9c, 0x06, 0x16, 0x8f, - 0xae, 0xc4, 0x6c, 0x43, 0x85, 0xd3, 0xc0, 0x8c, 0xae, 0xc4, 0xe3, 0x2f, 0x2a, 0xd0, 0xc8, 0x8f, - 0x22, 0xdd, 0x80, 0x9a, 0x77, 0x16, 0x85, 0xa8, 0xbd, 0x44, 0xdb, 0xb0, 0xe3, 0x95, 0xdf, 0xdb, - 0x9a, 0x4b, 0x77, 0x61, 0xdb, 0x2b, 0x3e, 0x56, 0x35, 0xa4, 0x14, 0xb6, 0xbc, 0xc2, 0x6b, 0x4e, - 0xfb, 0x94, 0xee, 0xc3, 0xae, 0x77, 0xf3, 0xb9, 0xa3, 0x8d, 0xe9, 0x1e, 0x68, 0x5e, 0xf1, 0x3d, - 0x20, 0xb4, 0x09, 0x6d, 0x83, 0xe6, 0x95, 0x2e, 0xe0, 0xda, 0xb7, 0x84, 0xee, 0xc2, 0x96, 0x57, - 0xb8, 0xa5, 0x6a, 0xdf, 0x11, 0x4a, 0xa1, 0xe5, 0x2d, 0x5f, 0xe5, 0xb4, 0xef, 0x09, 0xdd, 0x07, - 0xea, 0xdd, 0xb8, 0xef, 0x68, 0x3f, 0x10, 0xba, 0x07, 0xdb, 0x5e, 0xe1, 0x52, 0x20, 0xb4, 0x1f, - 0x09, 0xdd, 0x84, 0x86, 0x97, 0x7d, 0x37, 0xb5, 0x2f, 0xab, 0x69, 0x94, 0x9d, 0x65, 0xed, 0xf7, - 0xea, 0x71, 0xff, 0xe3, 0xe3, 0x31, 0x93, 0x93, 0xe9, 0x48, 0x9d, 0xd9, 0xa3, 0x6b, 0xe6, 0xfb, - 0xec, 0x5a, 0xa2, 0x33, 0x39, 0xca, 0x0c, 0x7c, 0xdb, 0x65, 0x42, 0x72, 0x36, 0x9a, 0x4a, 0x74, - 0x8f, 0x66, 0x36, 0x1e, 0xa5, 0xae, 0xce, 0xc3, 0x78, 0x34, 0xaa, 0xa7, 0x99, 0x27, 0x7f, 0x05, - 0x00, 0x00, 0xff, 0xff, 0xf3, 0x38, 0x9d, 0x4f, 0x04, 0x11, 0x00, 0x00, + // 1163 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x58, 0xcd, 0x6f, 0x1b, 0x45, + 0x14, 0xef, 0xda, 0xf1, 0x47, 0x9e, 0xe3, 0x64, 0x33, 0x89, 0x89, 0x1b, 0x0a, 0x0d, 0x5b, 0x24, + 0xaa, 0x4a, 0x38, 0xc2, 0xe5, 0x40, 0x8f, 0xb4, 0x3e, 0xd4, 0x54, 0xad, 0xaa, 0x75, 0x04, 0x12, + 0x12, 0x5a, 0xad, 0x77, 0x1f, 0xf6, 0x68, 0x3f, 0x66, 0x33, 0x33, 0x6e, 0xd8, 0xdc, 0xb9, 0x22, + 0x24, 0x8e, 0xdc, 0xf8, 0x6b, 0xf8, 0xba, 0xf3, 0x4f, 0x80, 0xa0, 0x12, 0x88, 0x2b, 0x9a, 0xd9, + 0x5d, 0xdb, 0xeb, 0x26, 0xe1, 0x43, 0x54, 0x8a, 0x94, 0x9b, 0xdf, 0x9b, 0x99, 0x37, 0xbf, 0xdf, + 0xef, 0xcd, 0xbc, 0x9d, 0x67, 0x20, 0x34, 0x96, 0xc8, 0x63, 0x37, 0x74, 0x22, 0x31, 0xe9, 0x25, + 0x9c, 0x49, 0x46, 0x3a, 0x11, 0x0d, 0x9f, 0xcd, 0x44, 0x66, 0xf5, 0x8a, 0x09, 0xfb, 0x1b, 0x1e, + 0x8b, 0x22, 0x16, 0x67, 0xee, 0xfd, 0x6d, 0x81, 0xfc, 0x19, 0xf5, 0x70, 0xb1, 0xce, 0x1a, 0xc0, + 0xc6, 0x11, 0x8d, 0x50, 0x48, 0x37, 0x4a, 0x1e, 0x8b, 0x09, 0xd9, 0x87, 0x66, 0x32, 0x4d, 0x05, + 0xf5, 0xdc, 0xb0, 0x6b, 0x1c, 0x18, 0xb7, 0xab, 0xf6, 0xdc, 0x26, 0x5d, 0x68, 0x84, 0x6c, 0xa2, + 0x87, 0x2a, 0x7a, 0xa8, 0x30, 0xad, 0x04, 0xe0, 0x48, 0x30, 0x1b, 0x8f, 0x67, 0x28, 0x24, 0xd9, + 0x83, 0x46, 0x82, 0xc8, 0x1d, 0xea, 0xe7, 0x21, 0xea, 0xca, 0x1c, 0xfa, 0xe4, 0x2e, 0xac, 0x71, + 0x16, 0xa2, 0x5e, 0xbd, 0xd9, 0xbf, 0xd9, 0x3b, 0x13, 0x73, 0xef, 0x29, 0x22, 0xb7, 0x59, 0x88, + 0xb6, 0x9e, 0x4c, 0x76, 0xa1, 0xe6, 0xb1, 0x59, 0x2c, 0xbb, 0xd5, 0x03, 0xe3, 0x76, 0xdb, 0xce, + 0x0c, 0xeb, 0x6b, 0x03, 0x5a, 0x7a, 0x4b, 0x91, 0xb0, 0x58, 0x20, 0xb9, 0x0b, 0x75, 0x21, 0x5d, + 0x39, 0x13, 0x7a, 0xcb, 0x56, 0xff, 0xd5, 0x72, 0xf0, 0x5c, 0x86, 0x91, 0x9e, 0x62, 0xe7, 0x53, + 0xc9, 0xfb, 0xb0, 0x2e, 0x0b, 0xf2, 0x1a, 0x54, 0xab, 0x7f, 0xeb, 0x1c, 0x50, 0xcb, 0x22, 0xd9, + 0x8b, 0x55, 0xe7, 0xa0, 0xfb, 0xc9, 0x80, 0xbd, 0x07, 0x1c, 0x5d, 0x89, 0x0f, 0x58, 0x18, 0xa2, + 0x27, 0x29, 0x8b, 0x0b, 0x75, 0xee, 0x41, 0x93, 0xe3, 0xb1, 0x23, 0xd3, 0x04, 0x35, 0xd6, 0xcd, + 0xfe, 0xeb, 0xe7, 0xec, 0x69, 0xe3, 0xf1, 0x51, 0x9a, 0xa0, 0xdd, 0xe0, 0xd9, 0x0f, 0xd2, 0x81, + 0xba, 0x5a, 0x4a, 0x7d, 0x0d, 0x76, 0xcd, 0xae, 0x71, 0x3c, 0x1e, 0xfa, 0xe4, 0xc6, 0x32, 0x8d, + 0xaa, 0x1e, 0x59, 0x42, 0x78, 0x1d, 0x9a, 0x09, 0x67, 0x9f, 0xa5, 0x6a, 0xd9, 0x5a, 0x96, 0x36, + 0x6d, 0x0f, 0x7d, 0xf2, 0x0e, 0xd4, 0x85, 0x37, 0xc5, 0xc8, 0xed, 0xd6, 0x34, 0xf9, 0xeb, 0x67, + 0x8a, 0x76, 0x3f, 0x64, 0x63, 0x3b, 0x9f, 0x68, 0x3d, 0x37, 0xa0, 0x33, 0xe0, 0x2c, 0xb9, 0xd4, + 0xbc, 0x1e, 0xc3, 0x96, 0x37, 0xc7, 0xe7, 0xc4, 0x6e, 0x84, 0x39, 0xc1, 0x37, 0xcb, 0x88, 0xf2, + 0xeb, 0xd0, 0x5b, 0x90, 0x79, 0xe2, 0x46, 0x68, 0x6f, 0x7a, 0x25, 0xdb, 0xfa, 0xcd, 0x80, 0xdd, + 0x87, 0xae, 0xb8, 0x4a, 0x94, 0xff, 0x30, 0xe0, 0xfa, 0x00, 0x85, 0xc7, 0xe9, 0x18, 0xaf, 0x12, + 0xef, 0x6f, 0x0c, 0xe8, 0x8c, 0xa6, 0xec, 0xe4, 0x32, 0x73, 0xb6, 0x7e, 0x35, 0xe0, 0x95, 0xac, + 0xba, 0x3c, 0x75, 0xb9, 0xa4, 0x97, 0x34, 0x33, 0x1f, 0xc0, 0x66, 0x52, 0xc0, 0x5b, 0x4e, 0xcc, + 0xad, 0xb3, 0x13, 0x33, 0xa7, 0xa2, 0xf3, 0xd2, 0x4e, 0x96, 0x4d, 0xeb, 0x17, 0x03, 0x76, 0x55, + 0xd5, 0xb9, 0x2a, 0x7c, 0x7f, 0x36, 0x60, 0xe7, 0xa1, 0x2b, 0xae, 0x0a, 0xdd, 0xe7, 0x06, 0x74, + 0x8b, 0x6a, 0x73, 0x55, 0x38, 0xab, 0x8f, 0x8a, 0xaa, 0x34, 0x97, 0x99, 0xef, 0xff, 0xfd, 0x51, + 0xa9, 0x40, 0x7b, 0x18, 0x0b, 0xe4, 0xf2, 0xe5, 0x71, 0x7d, 0xeb, 0x45, 0xc8, 0x8a, 0xf1, 0xfa, + 0x2a, 0x18, 0x72, 0x0b, 0x16, 0x09, 0x71, 0xa4, 0x3b, 0xd1, 0xdc, 0xd7, 0xed, 0x8d, 0xb9, 0xf3, + 0xc8, 0x9d, 0x90, 0xd7, 0x00, 0x04, 0x4e, 0x22, 0x8c, 0xa5, 0xda, 0xa8, 0x96, 0x49, 0x97, 0x7b, + 0x86, 0xbe, 0x1a, 0xf6, 0xa6, 0x6e, 0x1c, 0x63, 0xa8, 0x86, 0xeb, 0xd9, 0x70, 0xee, 0x19, 0xfa, + 0x25, 0x65, 0x1b, 0x65, 0x65, 0x4b, 0x29, 0x69, 0xae, 0xa6, 0x64, 0x0f, 0x1a, 0x9c, 0x9d, 0x38, + 0xd4, 0x17, 0xdd, 0xf5, 0x83, 0xaa, 0x7a, 0x40, 0x73, 0x76, 0x32, 0xf4, 0x05, 0x79, 0x17, 0x9a, + 0x6a, 0xc0, 0x77, 0xa5, 0xdb, 0x85, 0x83, 0xea, 0xc5, 0x4f, 0x36, 0x15, 0x63, 0xe0, 0x4a, 0xd7, + 0xfa, 0xbc, 0x02, 0xed, 0x01, 0x86, 0x28, 0xf1, 0x12, 0xe8, 0x5e, 0xd6, 0x6c, 0xed, 0x22, 0xcd, + 0x6a, 0x17, 0x68, 0x56, 0x5f, 0xd5, 0xec, 0x0d, 0xd8, 0x48, 0x38, 0x8d, 0x5c, 0x9e, 0x3a, 0x01, + 0xa6, 0xa2, 0xdb, 0xd0, 0xc2, 0xb5, 0x72, 0xdf, 0x23, 0x4c, 0x85, 0xf5, 0xa7, 0x01, 0xed, 0x11, + 0xba, 0xdc, 0x9b, 0xbe, 0x3c, 0x1d, 0x96, 0xf1, 0x57, 0x2f, 0xc0, 0xbf, 0xb6, 0x8a, 0xff, 0x0e, + 0x6c, 0x73, 0x14, 0xb3, 0x50, 0x3a, 0x4b, 0xf2, 0x64, 0x27, 0x6e, 0x2b, 0x1b, 0x78, 0x30, 0x17, + 0xe9, 0x10, 0x6a, 0xc7, 0x33, 0xe4, 0xa9, 0x56, 0xe1, 0xc2, 0x33, 0x90, 0xcd, 0xb3, 0xbe, 0xaa, + 0xc0, 0x46, 0xc1, 0x5c, 0x85, 0xfa, 0x6f, 0xed, 0xd2, 0xbf, 0xa7, 0x6c, 0x41, 0x5b, 0x03, 0x70, + 0x62, 0xe6, 0xe3, 0xa2, 0xc0, 0xb4, 0xb4, 0xf3, 0x09, 0xf3, 0x71, 0x55, 0x96, 0xda, 0x3f, 0x92, + 0xa5, 0x7e, 0xb6, 0x2c, 0x3d, 0x58, 0x9b, 0x52, 0x99, 0xa5, 0xbe, 0xd5, 0xdf, 0x3f, 0xbb, 0x46, + 0x3d, 0xa4, 0x52, 0xd8, 0x7a, 0x9e, 0x35, 0x80, 0x96, 0x6a, 0xeb, 0x46, 0x69, 0xec, 0xa9, 0xd6, + 0xf7, 0xdc, 0xb6, 0xf5, 0xc6, 0x6a, 0x9b, 0xb8, 0x8c, 0x50, 0x75, 0xa2, 0x8d, 0x47, 0x98, 0xf6, + 0x47, 0x38, 0xd1, 0x0a, 0xe9, 0x8b, 0x9b, 0x47, 0xa8, 0xe9, 0x7b, 0x4b, 0x6e, 0x42, 0x6b, 0xe9, + 0x6c, 0xe6, 0x21, 0x60, 0x71, 0x34, 0xff, 0xbe, 0x42, 0x53, 0xe1, 0x3c, 0x73, 0xc3, 0x5c, 0xc0, + 0xa6, 0xdd, 0xa0, 0xe2, 0x43, 0x65, 0xaa, 0xc8, 0x8b, 0x02, 0x25, 0xba, 0xb5, 0x83, 0xaa, 0x8a, + 0x3c, 0xaf, 0x50, 0xc2, 0xfa, 0x04, 0x20, 0x07, 0xa7, 0x28, 0x2e, 0x32, 0x68, 0x2c, 0x67, 0xf0, + 0x3d, 0x68, 0x04, 0x98, 0xf6, 0x05, 0x4e, 0xba, 0x15, 0xad, 0xdd, 0x79, 0xb7, 0x20, 0x0f, 0x65, + 0x17, 0xd3, 0xad, 0x18, 0xb6, 0x47, 0xd9, 0x66, 0xea, 0xac, 0x50, 0x21, 0xa9, 0x27, 0x56, 0xaa, + 0xa6, 0xb1, 0x5a, 0x35, 0x6f, 0x42, 0x2b, 0xc2, 0x88, 0xf1, 0xd4, 0x11, 0xf4, 0x14, 0x0b, 0x35, + 0x32, 0xd7, 0x88, 0x9e, 0xa2, 0xe2, 0x1b, 0xcf, 0x22, 0x87, 0xb3, 0x13, 0x51, 0x1c, 0xa8, 0x78, + 0x16, 0xd9, 0xec, 0x44, 0xdc, 0xf9, 0xa2, 0x02, 0x8d, 0xfc, 0x2a, 0x92, 0x75, 0xa8, 0x05, 0x4f, + 0x58, 0x8c, 0xe6, 0x35, 0xd2, 0x81, 0xed, 0x60, 0xb5, 0xdf, 0x36, 0x7d, 0xb2, 0x03, 0x5b, 0x41, + 0xb9, 0x59, 0x35, 0x91, 0x10, 0xd8, 0x0c, 0x4a, 0xdd, 0x9c, 0xf9, 0x29, 0xd9, 0x83, 0x9d, 0xe0, + 0xc5, 0x76, 0xc7, 0x9c, 0x90, 0x5d, 0x30, 0x83, 0x72, 0x3f, 0x20, 0xcc, 0x29, 0xe9, 0x80, 0x19, + 0xac, 0x3c, 0xc0, 0xcd, 0x6f, 0x0d, 0xb2, 0x03, 0x9b, 0x41, 0xe9, 0x95, 0x6a, 0x7e, 0x67, 0x10, + 0x02, 0xed, 0x60, 0xf9, 0x29, 0x67, 0x7e, 0x6f, 0x90, 0x3d, 0x20, 0xc1, 0x0b, 0xef, 0x1d, 0xf3, + 0x07, 0x83, 0xec, 0xc2, 0x56, 0x50, 0x7a, 0x14, 0x08, 0xf3, 0x47, 0x83, 0x6c, 0x40, 0x23, 0xc8, + 0xbe, 0x9b, 0xe6, 0x97, 0x55, 0x6d, 0x65, 0x77, 0xd9, 0xfc, 0xbd, 0x7a, 0xe7, 0x1e, 0x34, 0x8b, + 0xff, 0x4b, 0x08, 0x40, 0xfd, 0xb1, 0x2b, 0x24, 0x72, 0xf3, 0x9a, 0xfa, 0x6d, 0xa3, 0xeb, 0x23, + 0x37, 0x0d, 0xf5, 0xfb, 0x23, 0x4e, 0x95, 0xbf, 0xa2, 0x44, 0x7b, 0xaa, 0x2e, 0xa7, 0x59, 0xbd, + 0x3f, 0xf8, 0xf8, 0xfe, 0x84, 0xca, 0xe9, 0x6c, 0xac, 0xae, 0xfb, 0xe1, 0x29, 0x0d, 0x43, 0x7a, + 0x2a, 0xd1, 0x9b, 0x1e, 0x66, 0xb9, 0x7f, 0xdb, 0xa7, 0x42, 0x72, 0x3a, 0x9e, 0x49, 0xf4, 0x0f, + 0x8b, 0x13, 0x70, 0xa8, 0x0f, 0xc4, 0xdc, 0x4c, 0xc6, 0xe3, 0xba, 0xf6, 0xdc, 0xfd, 0x2b, 0x00, + 0x00, 0xff, 0xff, 0xe0, 0x4c, 0xc7, 0x32, 0x95, 0x12, 0x00, 0x00, } diff --git a/internal/proto/master.proto b/internal/proto/master.proto index 81093d55e4..834dcbc19f 100644 --- a/internal/proto/master.proto +++ b/internal/proto/master.proto @@ -87,4 +87,7 @@ service Master { * @return StringListResponse */ rpc ShowPartitions(internal.ShowPartitionRequest) returns (service.StringListResponse) {} -} \ No newline at end of file + + + rpc Tso(stream internal.TsoRequest) returns (stream internal.TsoResponse) {} +} diff --git a/internal/proto/masterpb/master.pb.go b/internal/proto/masterpb/master.pb.go index c14e4de92a..70f58cd90a 100644 --- a/internal/proto/masterpb/master.pb.go +++ b/internal/proto/masterpb/master.pb.go @@ -30,31 +30,33 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package func init() { proto.RegisterFile("master.proto", fileDescriptor_f9c348dec43a6705) } var fileDescriptor_f9c348dec43a6705 = []byte{ - // 384 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x4d, 0x4f, 0xc2, 0x40, - 0x10, 0xe5, 0x44, 0xcc, 0x86, 0x0f, 0x59, 0x6f, 0x78, 0xeb, 0xc9, 0x80, 0xb4, 0x46, 0xff, 0x80, - 0x01, 0x0e, 0x1c, 0x34, 0x31, 0x70, 0xd3, 0x18, 0xdc, 0x96, 0x0d, 0x4c, 0x6c, 0xbb, 0x75, 0x67, - 0x8a, 0x09, 0xff, 0xc2, 0x7f, 0x6c, 0xda, 0x52, 0xda, 0x15, 0x8a, 0xe8, 0xad, 0x3b, 0xf3, 0xf6, - 0xbd, 0xce, 0xbc, 0x97, 0x65, 0x8d, 0x40, 0x20, 0x49, 0x6d, 0x47, 0x5a, 0x91, 0xe2, 0x17, 0x01, - 0xf8, 0xeb, 0x18, 0xb3, 0x93, 0x9d, 0xb5, 0xba, 0x0d, 0x4f, 0x05, 0x81, 0x0a, 0xb3, 0x62, 0x97, - 0x43, 0x48, 0x52, 0x87, 0xc2, 0x9f, 0x07, 0xb8, 0xdc, 0xd6, 0x3a, 0x28, 0xf5, 0x1a, 0x3c, 0x59, - 0x94, 0x6e, 0xbf, 0xce, 0x58, 0xfd, 0x31, 0xbd, 0xcf, 0x05, 0x3b, 0x1f, 0x69, 0x29, 0x48, 0x8e, - 0x94, 0xef, 0x4b, 0x8f, 0x40, 0x85, 0xdc, 0xb6, 0x0d, 0xa5, 0x9c, 0xd3, 0xfe, 0x09, 0x9c, 0xca, - 0x8f, 0x58, 0x22, 0x75, 0x2f, 0x4d, 0xfc, 0xf6, 0x8f, 0x66, 0x24, 0x28, 0x46, 0xab, 0xc6, 0x5f, - 0x59, 0x6b, 0xac, 0x55, 0x54, 0x12, 0xb8, 0xae, 0x10, 0x30, 0x61, 0x27, 0xd2, 0xbb, 0xac, 0x39, - 0x11, 0x58, 0x62, 0xef, 0x57, 0xb0, 0x1b, 0xa8, 0x9c, 0xdc, 0x32, 0xc1, 0xdb, 0x5d, 0xd9, 0x43, - 0xa5, 0xfc, 0xa9, 0xc4, 0x48, 0x85, 0x28, 0xad, 0x1a, 0x8f, 0x19, 0x1f, 0x4b, 0xf4, 0x34, 0xb8, - 0xe5, 0x3d, 0xdd, 0x54, 0x8d, 0xb1, 0x07, 0xcd, 0xd5, 0xfa, 0x87, 0xd5, 0x0a, 0x60, 0x76, 0x35, - 0x4a, 0x3e, 0xad, 0x1a, 0x7f, 0x67, 0xed, 0xd9, 0x4a, 0x7d, 0x16, 0x6d, 0xac, 0x5c, 0x9d, 0x89, - 0xcb, 0xf5, 0xae, 0x0e, 0xeb, 0xcd, 0x48, 0x43, 0xb8, 0x7c, 0x00, 0xa4, 0xd2, 0x8c, 0x73, 0xd6, - 0xce, 0x0c, 0x7e, 0x12, 0x9a, 0x20, 0x1d, 0x70, 0x70, 0x34, 0x08, 0x3b, 0xdc, 0x89, 0x46, 0xbd, - 0xb0, 0x66, 0x62, 0x70, 0x41, 0xdf, 0x3f, 0x12, 0x83, 0xbf, 0x92, 0xbf, 0xb1, 0xc6, 0x44, 0x60, - 0xc1, 0xdd, 0xab, 0x0e, 0xc1, 0x1e, 0xf5, 0x69, 0x19, 0xd0, 0xac, 0x93, 0x1b, 0x5b, 0xc8, 0x38, - 0xbf, 0x44, 0x60, 0x4f, 0xab, 0x77, 0x58, 0x6b, 0x87, 0x33, 0x03, 0x00, 0xac, 0x95, 0x18, 0xbb, - 0xeb, 0x62, 0xe5, 0xce, 0x0c, 0xd8, 0x3f, 0xec, 0x1f, 0x0e, 0x9f, 0xef, 0x97, 0x40, 0xab, 0xd8, - 0x4d, 0x56, 0xeb, 0x6c, 0xc0, 0xf7, 0x61, 0x43, 0xd2, 0x5b, 0x39, 0x19, 0xc5, 0x60, 0x01, 0x48, - 0x1a, 0xdc, 0x98, 0xe4, 0xc2, 0xc9, 0x55, 0x9d, 0x94, 0xd7, 0xc9, 0x9e, 0xa2, 0xc8, 0x75, 0xeb, - 0xe9, 0xf9, 0xee, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x96, 0xb6, 0xf6, 0x5a, 0xb8, 0x04, 0x00, 0x00, + // 409 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xcd, 0x6e, 0xe2, 0x30, + 0x14, 0x85, 0x41, 0x23, 0xa1, 0x91, 0xc5, 0xcf, 0xe0, 0xd9, 0x31, 0xab, 0xc9, 0x0a, 0xc1, 0x90, + 0xa0, 0xe9, 0x0b, 0x54, 0xc0, 0x82, 0x45, 0x2b, 0x55, 0xc0, 0xaa, 0x55, 0x45, 0x9d, 0x60, 0x81, + 0xd5, 0x24, 0x4e, 0x7d, 0x6f, 0xa8, 0xc4, 0xfb, 0xf6, 0x3d, 0xaa, 0x24, 0x84, 0xc4, 0x05, 0x53, + 0xda, 0x1d, 0xb6, 0x8f, 0xbf, 0x83, 0xef, 0x39, 0x0a, 0xa9, 0x07, 0x0c, 0x90, 0x2b, 0x3b, 0x52, + 0x12, 0x25, 0xfd, 0x1d, 0x08, 0x7f, 0x1b, 0x43, 0xb6, 0xb2, 0xb3, 0xa3, 0x4e, 0xdd, 0x93, 0x41, + 0x20, 0xc3, 0x6c, 0xb3, 0x43, 0x45, 0x88, 0x5c, 0x85, 0xcc, 0x5f, 0x06, 0xb0, 0xde, 0xef, 0xb5, + 0x81, 0xab, 0xad, 0xf0, 0x78, 0xb1, 0xf5, 0xff, 0xed, 0x27, 0xa9, 0xdd, 0xa6, 0xf7, 0x29, 0x23, + 0xbf, 0xc6, 0x8a, 0x33, 0xe4, 0x63, 0xe9, 0xfb, 0xdc, 0x43, 0x21, 0x43, 0x6a, 0xdb, 0x9a, 0x53, + 0xce, 0xb4, 0x3f, 0x0a, 0x67, 0xfc, 0x25, 0xe6, 0x80, 0x9d, 0x3f, 0xba, 0x7e, 0xff, 0x8f, 0xe6, + 0xc8, 0x30, 0x06, 0xab, 0x42, 0x1f, 0x49, 0x73, 0xa2, 0x64, 0x54, 0x32, 0xf8, 0x67, 0x30, 0xd0, + 0x65, 0x17, 0xe2, 0x5d, 0xd2, 0x98, 0x32, 0x28, 0xd1, 0xfb, 0x06, 0xba, 0xa6, 0xca, 0xe1, 0x96, + 0x2e, 0xde, 0xcf, 0xca, 0x1e, 0x49, 0xe9, 0xcf, 0x38, 0x44, 0x32, 0x04, 0x6e, 0x55, 0x68, 0x4c, + 0xe8, 0x84, 0x83, 0xa7, 0x84, 0x5b, 0x9e, 0xd3, 0xd0, 0xf4, 0x8c, 0x23, 0x69, 0xee, 0xd6, 0x3f, + 0xed, 0x56, 0x08, 0xb3, 0xab, 0x51, 0xf2, 0xd3, 0xaa, 0xd0, 0x67, 0xd2, 0x9a, 0x6f, 0xe4, 0x6b, + 0x71, 0x0c, 0xc6, 0xd1, 0xe9, 0xba, 0xdc, 0xaf, 0x7b, 0xda, 0x6f, 0x8e, 0x4a, 0x84, 0xeb, 0x1b, + 0x01, 0x58, 0x7a, 0xe3, 0x92, 0xb4, 0xb2, 0x80, 0xef, 0x98, 0x42, 0x91, 0x3e, 0x70, 0x70, 0xb6, + 0x08, 0x07, 0xdd, 0x85, 0x41, 0x3d, 0x90, 0x46, 0x12, 0x70, 0x81, 0xef, 0x9f, 0xa9, 0xc1, 0x57, + 0xe1, 0x4f, 0xa4, 0x3e, 0x65, 0x50, 0xb0, 0x7b, 0xe6, 0x12, 0x1c, 0xa1, 0x2f, 0xeb, 0x80, 0x22, + 0xed, 0x3c, 0xd8, 0xc2, 0xc6, 0xf9, 0xa4, 0x02, 0x47, 0x5e, 0xbd, 0xd3, 0x5e, 0x07, 0x9d, 0x5e, + 0x00, 0x41, 0x9a, 0x49, 0xb0, 0x87, 0x53, 0x30, 0xce, 0x4c, 0x93, 0x7d, 0x27, 0xfe, 0x19, 0xf9, + 0xb1, 0x00, 0x49, 0xff, 0x1a, 0xf8, 0x0b, 0x90, 0x86, 0x71, 0xe9, 0x92, 0x9c, 0xd7, 0xad, 0x0e, + 0xab, 0xa3, 0xd1, 0xfd, 0xf5, 0x5a, 0xe0, 0x26, 0x76, 0x93, 0xb8, 0x9c, 0x9d, 0xf0, 0x7d, 0xb1, + 0x43, 0xee, 0x6d, 0x9c, 0x0c, 0x30, 0x58, 0x09, 0x40, 0x25, 0xdc, 0x18, 0xf9, 0xca, 0xc9, 0x31, + 0x4e, 0x4a, 0x75, 0xb2, 0xcf, 0x5b, 0xe4, 0xba, 0xb5, 0x74, 0x7d, 0xf5, 0x1e, 0x00, 0x00, 0xff, + 0xff, 0x47, 0x62, 0x7a, 0xee, 0x0c, 0x05, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -129,6 +131,7 @@ type MasterClient interface { // // @return StringListResponse ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest, opts ...grpc.CallOption) (*servicepb.StringListResponse, error) + Tso(ctx context.Context, opts ...grpc.CallOption) (Master_TsoClient, error) } type masterClient struct { @@ -229,6 +232,37 @@ func (c *masterClient) ShowPartitions(ctx context.Context, in *internalpb.ShowPa return out, nil } +func (c *masterClient) Tso(ctx context.Context, opts ...grpc.CallOption) (Master_TsoClient, error) { + stream, err := c.cc.NewStream(ctx, &_Master_serviceDesc.Streams[0], "/milvus.proto.master.Master/Tso", opts...) + if err != nil { + return nil, err + } + x := &masterTsoClient{stream} + return x, nil +} + +type Master_TsoClient interface { + Send(*internalpb.TsoRequest) error + Recv() (*internalpb.TsoResponse, error) + grpc.ClientStream +} + +type masterTsoClient struct { + grpc.ClientStream +} + +func (x *masterTsoClient) Send(m *internalpb.TsoRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *masterTsoClient) Recv() (*internalpb.TsoResponse, error) { + m := new(internalpb.TsoResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // MasterServer is the server API for Master service. type MasterServer interface { //* @@ -291,6 +325,7 @@ type MasterServer interface { // // @return StringListResponse ShowPartitions(context.Context, *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) + Tso(Master_TsoServer) error } // UnimplementedMasterServer can be embedded to have forward compatible implementations. @@ -327,6 +362,9 @@ func (*UnimplementedMasterServer) DescribePartition(ctx context.Context, req *in func (*UnimplementedMasterServer) ShowPartitions(ctx context.Context, req *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ShowPartitions not implemented") } +func (*UnimplementedMasterServer) Tso(srv Master_TsoServer) error { + return status.Errorf(codes.Unimplemented, "method Tso not implemented") +} func RegisterMasterServer(s *grpc.Server, srv MasterServer) { s.RegisterService(&_Master_serviceDesc, srv) @@ -512,6 +550,32 @@ func _Master_ShowPartitions_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _Master_Tso_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MasterServer).Tso(&masterTsoServer{stream}) +} + +type Master_TsoServer interface { + Send(*internalpb.TsoResponse) error + Recv() (*internalpb.TsoRequest, error) + grpc.ServerStream +} + +type masterTsoServer struct { + grpc.ServerStream +} + +func (x *masterTsoServer) Send(m *internalpb.TsoResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *masterTsoServer) Recv() (*internalpb.TsoRequest, error) { + m := new(internalpb.TsoRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + var _Master_serviceDesc = grpc.ServiceDesc{ ServiceName: "milvus.proto.master.Master", HandlerType: (*MasterServer)(nil), @@ -557,6 +621,13 @@ var _Master_serviceDesc = grpc.ServiceDesc{ Handler: _Master_ShowPartitions_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Tso", + Handler: _Master_Tso_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "master.proto", } diff --git a/internal/util/tsoutil/tso.go b/internal/util/tsoutil/tso.go new file mode 100644 index 0000000000..c51ca6b108 --- /dev/null +++ b/internal/util/tsoutil/tso.go @@ -0,0 +1,41 @@ +// Copyright 2019 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsoutil + +import ( + "time" + + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +const ( + physicalShiftBits = 18 + logicalBits = (1 << physicalShiftBits) - 1 +) + +// ParseTS parses the ts to (physical,logical). +func ParseTS(ts uint64) (time.Time, uint64) { + logical := ts & logicalBits + physical := ts >> physicalShiftBits + physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds()) + return physicalTime, logical +} + +// ParseTimestamp parses pdpb.Timestamp to time.Time +func ParseTimestamp(ts internalpb.TimestampMsg) (time.Time, uint64) { + logical := uint64(ts.Logical) + physical := ts.Physical + physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds()) + return physicalTime, logical +} diff --git a/internal/util/typeutil/convension.go b/internal/util/typeutil/convension.go new file mode 100644 index 0000000000..e26bd28125 --- /dev/null +++ b/internal/util/typeutil/convension.go @@ -0,0 +1,35 @@ +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package typeutil + +import ( + "encoding/binary" + "github.com/zilliztech/milvus-distributed/internal/errors" +) + +// BytesToUint64 converts a byte slice to uint64. +func BytesToUint64(b []byte) (uint64, error) { + if len(b) != 8 { + return 0, errors.Errorf("invalid data, must 8 bytes, but %d", len(b)) + } + + return binary.BigEndian.Uint64(b), nil +} + +// Uint64ToBytes converts uint64 to a byte slice. +func Uint64ToBytes(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} diff --git a/internal/util/typeutil/time.go b/internal/util/typeutil/time.go new file mode 100644 index 0000000000..1652d0cbbf --- /dev/null +++ b/internal/util/typeutil/time.go @@ -0,0 +1,34 @@ +// Copyright 2016 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package typeutil + +import "time" + +// ZeroTime is a zero time. +var ZeroTime = time.Time{} + +// ParseTimestamp returns a timestamp for a given byte slice. +func ParseTimestamp(data []byte) (time.Time, error) { + nano, err := BytesToUint64(data) + if err != nil { + return ZeroTime, err + } + + return time.Unix(0, int64(nano)), nil +} + +// SubTimeByWallClock returns the duration between two different timestamps. +func SubTimeByWallClock(after time.Time, before time.Time) time.Duration { + return time.Duration(after.UnixNano() - before.UnixNano()) +}