Add tso for master and add timestamp

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2020-10-29 09:31:08 +08:00 committed by yefu.chen
parent 2df04b6947
commit f0cc9c8ad3
19 changed files with 1264 additions and 292 deletions

76
cmd/master/main.go Normal file
View File

@ -0,0 +1,76 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master"
"go.uber.org/zap"
"log"
"os"
"os/signal"
"syscall"
)
func main() {
var yamlFile string
flag.StringVar(&yamlFile, "yaml", "", "yaml file")
flag.Parse()
// flag.Usage()
log.Println("yaml file: ", yamlFile)
conf.LoadConfig(yamlFile)
// Creates server.
ctx, cancel := context.WithCancel(context.Background())
svr, err := master.CreateServer(ctx)
if err != nil {
log.Print("create server failed", zap.Error(err))
}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
var sig os.Signal
go func() {
sig = <-sc
cancel()
}()
if err := svr.Run(); err != nil {
log.Fatal("run server failed", zap.Error(err))
}
<-ctx.Done()
log.Print("Got signal to exit", zap.String("signal", sig.String()))
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}
func exit(code int) {
os.Exit(code)
}

View File

@ -1,38 +0,0 @@
package main
import (
"flag"
"fmt"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master"
)
// func main() {
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
// cfg := config.NewConfig()
// s, err := server.CreateServer(ctx, cfg)
// if err != nil {
// panic(err)
// }
// err = s.Run()
// if err != nil {
// fmt.Println(err)
// }
// }
func init() {
// go mock.FakePulsarProducer()
}
func main() {
var yamlFile string
flag.StringVar(&yamlFile, "yaml", "", "yaml file")
flag.Parse()
// flag.Usage()
fmt.Println("yaml file: ", yamlFile)
conf.LoadConfig(yamlFile)
master.Run()
//master.SegmentStatsController()
//master.CollectionController()
}

View File

@ -27,5 +27,5 @@ services:
/milvus-distributed/scripts/core_build.sh -u && \
go build -o /milvus-distributed/cmd/writer/writer /milvus-distributed/cmd/writer/writer.go && \
go build -o /milvus-distributed/cmd/reader/reader /milvus-distributed/cmd/reader/reader.go && \
go build -o /milvus-distributed/cmd/master/master /milvus-distributed/cmd/master/master.go && \
go build -o /milvus-distributed/cmd/master/master /milvus-distributed/cmd/master/main.go && \
go build -o /milvus-distributed/cmd/proxy/proxy /milvus-distributed/cmd/proxy/proxy.go"

View File

@ -9,26 +9,11 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/master/collection"
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/master/informer"
//"github.com/zilliztech/milvus-distributed/internal/master/informer"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/master/segment"
)
func SegmentStatsController(kvbase kv.Base, errch chan error) {
ssChan := make(chan internalpb.SegmentStatistics, 10)
ssClient := informer.NewPulsarClient()
go segment.Listener(ssChan, ssClient)
for {
select {
case ss := <-ssChan:
errch <- ComputeCloseTime(ss, kvbase)
case <-time.After(5 * time.Second):
fmt.Println("wait for new request")
return
}
}
}
func ComputeCloseTime(ss internalpb.SegmentStatistics, kvbase kv.Base) error {
if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) {

View File

@ -2,43 +2,23 @@ package master
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"google.golang.org/grpc"
"net"
"strconv"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"log"
"time"
)
func Server(ch chan *schemapb.CollectionSchema, errch chan error, kvbase kv.Base) {
defaultGRPCPort := ":"
defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
// log.Fatal("failed to listen: %v", err)
errch <- err
return
}
s := grpc.NewServer()
masterpb.RegisterMasterServer(s, Master{CreateRequest: ch, kvBase: kvbase})
if err := s.Serve(lis); err != nil {
// log.Fatalf("failed to serve: %v", err)
errch <- err
return
}
}
type Master struct {
CreateRequest chan *schemapb.CollectionSchema
kvBase kv.Base
scheduler *ddRequestScheduler
mt metaTable
}
const slowThreshold = 5 * time.Millisecond
func (ms Master) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) {
var t task = &createCollectionTask{
@ -147,3 +127,39 @@ func (ms Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio
},
}, nil
}
//----------------------------------------Internal GRPC Service--------------------------------
// Tso implements gRPC PDServer.
func (ms *Master) Tso(stream masterpb.Master_TsoServer) error {
for {
request, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.WithStack(err)
}
start := time.Now()
count := request.GetCount()
ts, err := ms.tsoAllocator.GenerateTSO(count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
elapsed := time.Since(start)
if elapsed > slowThreshold {
log.Println("get timestamp too slow", zap.Duration("cost", elapsed))
}
response := &internalpb.TsoResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
Timestamp: &ts,
Count: count,
}
if err := stream.Send(response); err != nil {
return errors.WithStack(err)
}
}
}

View File

@ -10,7 +10,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
)
func NewPulsarClient() PulsarClient {
func NewPulsarClient() *PulsarClient {
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
@ -24,7 +24,7 @@ func NewPulsarClient() PulsarClient {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
return PulsarClient{
return &PulsarClient{
Client: client,
}
}

View File

@ -9,4 +9,5 @@ type Base interface {
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
LoadWithPrefix(key string) ( []string, []string)
Close()
}

281
internal/master/master.go Normal file
View File

@ -0,0 +1,281 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package master
import (
"context"
"fmt"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master/informer"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/master/controller"
"github.com/apache/pulsar-client-go/pulsar"
"google.golang.org/grpc"
"log"
"math/rand"
"net"
"strconv"
"github.com/golang/protobuf/proto"
"sync"
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
"go.etcd.io/etcd/clientv3"
)
// Server is the pd server.
type Master struct {
// Server state.
isServing int64
// Server start timestamp
startTimestamp int64
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel func()
serverLoopWg sync.WaitGroup
//grpc server
grpcServer * grpc.Server
// for tso.
tsoAllocator tso.Allocator
// pulsar client
pc *informer.PulsarClient
// chans
ssChan chan internalpb.SegmentStatistics
kvBase kv.Base
scheduler *ddRequestScheduler
mt metaTable
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
}
func newKvBase() kv.Base {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
// defer cli.Close()
kvBase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
return kvBase
}
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context) (*Master, error) {
rand.Seed(time.Now().UnixNano())
m := &Master{
ctx: ctx,
startTimestamp: time.Now().Unix(),
kvBase: newKvBase(),
ssChan: make(chan internalpb.SegmentStatistics, 10),
pc : informer.NewPulsarClient(),
}
m.grpcServer = grpc.NewServer()
masterpb.RegisterMasterServer(m.grpcServer, m)
return m, nil
}
// AddStartCallback adds a callback in the startServer phase.
func (s *Master) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
}
func (s *Master) startServer(ctx context.Context) error {
// Run callbacks
for _, cb := range s.startCallbacks {
cb()
}
// Server has started.
atomic.StoreInt64(&s.isServing, 1)
return nil
}
// AddCloseCallback adds a callback in the Close phase.
func (s *Master) AddCloseCallback(callbacks ...func()) {
s.closeCallbacks = append(s.closeCallbacks, callbacks...)
}
// Close closes the server.
func (s *Master) Close() {
if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) {
// server is already closed
return
}
log.Print("closing server")
s.stopServerLoop()
if s.kvBase != nil {
s.kvBase.Close()
}
// Run callbacks
for _, cb := range s.closeCallbacks {
cb()
}
log.Print("close server")
}
// IsClosed checks whether server is closed or not.
func (s *Master) IsClosed() bool {
return atomic.LoadInt64(&s.isServing) == 0
}
// Run runs the pd server.
func (s *Master) Run() error {
if err := s.startServer(s.ctx); err != nil {
return err
}
s.startServerLoop(s.ctx)
return nil
}
// Context returns the context of server.
func (s *Master) Context() context.Context {
return s.ctx
}
// LoopContext returns the loop context of server.
func (s *Master) LoopContext() context.Context {
return s.serverLoopCtx
}
func (s *Master) startServerLoop(ctx context.Context) {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
s.serverLoopWg.Add(3)
//go s.Se
go s.grpcLoop()
go s.pulsarLoop()
go s.segmentStatisticsLoop()
}
func (s *Master) stopServerLoop() {
s.serverLoopCancel()
s.serverLoopWg.Wait()
}
// StartTimestamp returns the start timestamp of this server
func (s *Master) StartTimestamp() int64 {
return s.startTimestamp
}
func (s *Master) grpcLoop() {
defer s.serverLoopWg.Done()
defaultGRPCPort := ":"
defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
log.Println("failed to listen: %v", err)
return
}
if err := s.grpcServer.Serve(lis); err != nil {
}
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case <-ctx.Done():
log.Print("server is closed, exit etcd leader loop")
return
}
}
}
// todo use messagestream
func (s *Master) pulsarLoop() {
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
consumer, err := s.pc.Client.Subscribe(pulsar.ConsumerOptions{
Topic: conf.Config.Master.PulsarTopic,
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
return
}
defer func (){
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
cancel()
}()
consumerChan := consumer.Chan()
for {
select {
case msg := <-consumerChan:
var m internalpb.SegmentStatistics
proto.Unmarshal(msg.Payload(), &m)
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), m.SegmentId)
s.ssChan <- m
consumer.Ack(msg)
case <-ctx.Done():
log.Print("server is closed, exit etcd leader loop")
return
}
}
}
func (s *Master) segmentStatisticsLoop() {
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case ss := <-s.ssChan:
controller.ComputeCloseTime(ss, s.kvBase)
case <-ctx.Done():
log.Print("server is closed, exit etcd leader loop")
return
}
}
}

View File

@ -1,49 +0,0 @@
package segment
import (
"context"
"fmt"
"log"
"github.com/golang/protobuf/proto"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/master/informer"
)
type SegmentStats struct {
SegementID uint64
MemorySize uint64
MemoryRate float64
Rows int64
}
func Listener(ssChan chan internalpb.SegmentStatistics, pc informer.PulsarClient) error {
consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{
Topic: conf.Config.Master.PulsarTopic,
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
for {
msg, err := consumer.Receive(context.TODO())
if err != nil {
log.Fatal(err)
}
var m internalpb.SegmentStatistics
proto.Unmarshal(msg.Payload(), &m)
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), m.SegmentId)
ssChan <- m
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
return nil
}

View File

@ -1,46 +0,0 @@
package master
import (
"log"
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master/controller"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"go.etcd.io/etcd/clientv3"
)
func Run() {
kvBase := newKvBase()
collectionChan := make(chan *schemapb.CollectionSchema)
defer close(collectionChan)
errorCh := make(chan error)
defer close(errorCh)
go Server(collectionChan, errorCh, kvBase)
go controller.SegmentStatsController(kvBase, errorCh)
go controller.CollectionController(collectionChan, kvBase, errorCh)
//go timetick.TimeTickService()
for {
for v := range errorCh {
log.Fatal(v)
}
}
}
func newKvBase() kv.Base {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
// defer cli.Close()
kvBase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
return kvBase
}

View File

@ -0,0 +1,115 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tso
import (
"go.etcd.io/etcd/clientv3"
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/pingcap/log"
"go.uber.org/zap"
)
// Allocator is a Timestamp Oracle allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
// It will synchronize TSO with etcd and initialize the
// memory for later allocation work.
Initialize() error
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// SetTSO sets the physical part with given tso. It's mainly used for BR restore
// and can not forcibly set the TSO smaller than now.
SetTSO(tso uint64) error
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
GenerateTSO(count uint32) (internalpb.TimestampMsg, error)
// Reset is used to reset the TSO allocator.
Reset()
}
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
timestampOracle *timestampOracle
}
// NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) Allocator {
return &GlobalTSOAllocator{
timestampOracle: &timestampOracle{
client: client,
rootPath: rootPath,
saveInterval: saveInterval,
maxResetTSGap: maxResetTSGap,
},
}
}
// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize() error {
return gta.timestampOracle.SyncTimestamp()
}
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.timestampOracle.UpdateTimestamp()
}
// SetTSO sets the physical part with given tso.
func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
return gta.timestampOracle.ResetUserTimestamp(tso)
}
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (internalpb.TimestampMsg, error) {
var resp internalpb.TimestampMsg
if count == 0 {
return resp, errors.New("tso count should be positive")
}
maxRetryCount := 10
for i := 0; i < maxRetryCount; i++ {
current := (*atomicObject)(atomic.LoadPointer(&gta.timestampOracle.TSO))
if current == nil || current.physical == typeutil.ZeroTime {
// If it's leader, maybe SyncTimestamp hasn't completed yet
log.Info("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
resp.Physical = current.physical.UnixNano() / int64(time.Millisecond)
resp.Logical = atomic.AddInt64(&current.logical, int64(count))
if resp.Logical >= maxLogical {
log.Error("logical part outside of max logical interval, please check ntp time",
zap.Reflect("response", resp),
zap.Int("retry-count", i))
time.Sleep(UpdateTimestampStep)
continue
}
return resp, nil
}
return resp, errors.New("can not get timestamp")
}
// Reset is used to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() {
gta.timestampOracle.ResetTimestamp()
}

217
internal/master/tso/tso.go Normal file
View File

@ -0,0 +1,217 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tso
import (
"go.uber.org/zap"
"log"
"path"
"sync/atomic"
"time"
"unsafe"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/util/etcdutil"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
const (
// UpdateTimestampStep is used to update timestamp.
UpdateTimestampStep = 50 * time.Millisecond
// updateTimestampGuard is the min timestamp interval.
updateTimestampGuard = time.Millisecond
// maxLogical is the max upper limit for logical time.
// When a TSO's logical time reaches this limit,
// the physical time will be forced to increase.
maxLogical = int64(1 << 18)
)
// atomicObject is used to store the current TSO in memory.
type atomicObject struct {
physical time.Time
logical int64
}
// timestampOracle is used to maintain the logic of tso.
type timestampOracle struct {
client *clientv3.Client
rootPath string
// TODO: remove saveInterval
saveInterval time.Duration
maxResetTSGap func() time.Duration
// For tso, set after the PD becomes a leader.
TSO unsafe.Pointer
lastSavedTime atomic.Value
}
func (t *timestampOracle) getTimestampPath() string {
return path.Join(t.rootPath, "timestamp")
}
func (t *timestampOracle) loadTimestamp() (time.Time, error) {
data, err := etcdutil.GetValue(t.client, t.getTimestampPath())
if err != nil {
return typeutil.ZeroTime, err
}
if len(data) == 0 {
return typeutil.ZeroTime, nil
}
return typeutil.ParseTimestamp(data)
}
// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it,
// otherwise, update it.
func (t *timestampOracle) saveTimestamp( ts time.Time) error {
key := t.getTimestampPath()
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
err := errors.New("")
println("%v,%v",key, data)
//resp, err := leadership.LeaderTxn().
// Then(clientv3.OpPut(key, string(data))).
// Commit()
if err != nil {
return errors.WithStack(err)
}
//if !resp.Succeeded {
// return errors.New("save timestamp failed, maybe we lost leader")
//}
t.lastSavedTime.Store(ts)
return nil
}
// SyncTimestamp is used to synchronize the timestamp.
func (t *timestampOracle) SyncTimestamp() error {
last, err := t.loadTimestamp()
if err != nil {
return err
}
next := time.Now()
// If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`,
// the timestamp allocation will start from the saved etcd timestamp temporarily.
if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
next = last.Add(updateTimestampGuard)
}
save := next.Add(t.saveInterval)
if err = t.saveTimestamp(save); err != nil {
return err
}
log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
current := &atomicObject{
physical: next,
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))
return nil
}
// ResetUserTimestamp update the physical part with specified tso.
func (t *timestampOracle) ResetUserTimestamp( tso uint64) error {
//if !leadership.Check() {
// return errors.New("Setup timestamp failed, lease expired")
//}
physical, _ := tsoutil.ParseTS(tso)
next := physical.Add(time.Millisecond)
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
// do not update
if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard {
return errors.New("the specified ts too small than now")
}
if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() {
return errors.New("the specified ts too large than now")
}
save := next.Add(t.saveInterval)
if err := t.saveTimestamp( save); err != nil {
return err
}
update := &atomicObject{
physical: next,
}
atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update))
return nil
}
// UpdateTimestamp is used to update the timestamp.
// This function will do two things:
// 1. When the logical time is going to be used up, increase the current physical time.
// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time
// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and
// we also need to save the next physical time plus `TsoSaveInterval` into etcd.
//
// Here is some constraints that this function must satisfy:
// 1. The saved time is monotonically increasing.
// 2. The physical time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
func (t *timestampOracle) UpdateTimestamp() error {
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
now := time.Now()
jetLag := typeutil.SubTimeByWallClock(now, prev.physical)
if jetLag > 3*UpdateTimestampStep {
log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now))
}
var next time.Time
prevLogical := atomic.LoadInt64(&prev.logical)
// If the system time is greater, it will be synchronized with the system time.
if jetLag > updateTimestampGuard {
next = now
} else if prevLogical > maxLogical/2 {
// The reason choosing maxLogical/2 here is that it's big enough for common cases.
// Because there is enough timestamp can be allocated before next update.
log.Print("the logical time may be not enough", zap.Int64("prev-logical", prevLogical))
next = prev.physical.Add(time.Millisecond)
} else {
// It will still use the previous physical time to alloc the timestamp.
return nil
}
// It is not safe to increase the physical time to `next`.
// The time window needs to be updated and saved to etcd.
if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard {
save := next.Add(t.saveInterval)
if err := t.saveTimestamp( save); err != nil {
return err
}
}
current := &atomicObject{
physical: next,
logical: 0,
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))
return nil
}
// ResetTimestamp is used to reset the timestamp.
func (t *timestampOracle) ResetTimestamp() {
zero := &atomicObject{
physical: typeutil.ZeroTime,
}
atomic.StorePointer(&t.TSO, unsafe.Pointer(zero))
}

View File

@ -29,6 +29,35 @@ enum ReqType {
kSearch = 500;
}
enum PeerRole {
Master = 0;
Reader = 1;
Writer = 2;
Proxy = 3;
}
message TimestampMsg {
int64 physical = 1;
int64 logical = 2;
}
message TsoRequest {
int64 peer_id = 1;
PeerRole role = 2;
uint32 count = 3;
}
message TsoResponse {
common.Status status = 1;
TimestampMsg timestamp = 2;
uint32 count = 3;
}
message CreateCollectionRequest {
ReqType req_type = 1;
@ -190,4 +219,4 @@ message SegmentStatistics {
uint64 segment_id = 1;
uint64 memory_size = 2;
int64 num_rows = 3;
}
}

View File

@ -84,6 +84,194 @@ func (ReqType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{0}
}
type PeerRole int32
const (
PeerRole_Master PeerRole = 0
PeerRole_Reader PeerRole = 1
PeerRole_Writer PeerRole = 2
PeerRole_Proxy PeerRole = 3
)
var PeerRole_name = map[int32]string{
0: "Master",
1: "Reader",
2: "Writer",
3: "Proxy",
}
var PeerRole_value = map[string]int32{
"Master": 0,
"Reader": 1,
"Writer": 2,
"Proxy": 3,
}
func (x PeerRole) String() string {
return proto.EnumName(PeerRole_name, int32(x))
}
func (PeerRole) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{1}
}
type TimestampMsg struct {
Physical int64 `protobuf:"varint,1,opt,name=physical,proto3" json:"physical,omitempty"`
Logical int64 `protobuf:"varint,2,opt,name=logical,proto3" json:"logical,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TimestampMsg) Reset() { *m = TimestampMsg{} }
func (m *TimestampMsg) String() string { return proto.CompactTextString(m) }
func (*TimestampMsg) ProtoMessage() {}
func (*TimestampMsg) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{0}
}
func (m *TimestampMsg) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TimestampMsg.Unmarshal(m, b)
}
func (m *TimestampMsg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_TimestampMsg.Marshal(b, m, deterministic)
}
func (m *TimestampMsg) XXX_Merge(src proto.Message) {
xxx_messageInfo_TimestampMsg.Merge(m, src)
}
func (m *TimestampMsg) XXX_Size() int {
return xxx_messageInfo_TimestampMsg.Size(m)
}
func (m *TimestampMsg) XXX_DiscardUnknown() {
xxx_messageInfo_TimestampMsg.DiscardUnknown(m)
}
var xxx_messageInfo_TimestampMsg proto.InternalMessageInfo
func (m *TimestampMsg) GetPhysical() int64 {
if m != nil {
return m.Physical
}
return 0
}
func (m *TimestampMsg) GetLogical() int64 {
if m != nil {
return m.Logical
}
return 0
}
type TsoRequest struct {
PeerId int64 `protobuf:"varint,1,opt,name=peer_id,json=peerId,proto3" json:"peer_id,omitempty"`
Role PeerRole `protobuf:"varint,2,opt,name=role,proto3,enum=milvus.proto.internal.PeerRole" json:"role,omitempty"`
Count uint32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TsoRequest) Reset() { *m = TsoRequest{} }
func (m *TsoRequest) String() string { return proto.CompactTextString(m) }
func (*TsoRequest) ProtoMessage() {}
func (*TsoRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{1}
}
func (m *TsoRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TsoRequest.Unmarshal(m, b)
}
func (m *TsoRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_TsoRequest.Marshal(b, m, deterministic)
}
func (m *TsoRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_TsoRequest.Merge(m, src)
}
func (m *TsoRequest) XXX_Size() int {
return xxx_messageInfo_TsoRequest.Size(m)
}
func (m *TsoRequest) XXX_DiscardUnknown() {
xxx_messageInfo_TsoRequest.DiscardUnknown(m)
}
var xxx_messageInfo_TsoRequest proto.InternalMessageInfo
func (m *TsoRequest) GetPeerId() int64 {
if m != nil {
return m.PeerId
}
return 0
}
func (m *TsoRequest) GetRole() PeerRole {
if m != nil {
return m.Role
}
return PeerRole_Master
}
func (m *TsoRequest) GetCount() uint32 {
if m != nil {
return m.Count
}
return 0
}
type TsoResponse struct {
Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
Timestamp *TimestampMsg `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Count uint32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TsoResponse) Reset() { *m = TsoResponse{} }
func (m *TsoResponse) String() string { return proto.CompactTextString(m) }
func (*TsoResponse) ProtoMessage() {}
func (*TsoResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{2}
}
func (m *TsoResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TsoResponse.Unmarshal(m, b)
}
func (m *TsoResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_TsoResponse.Marshal(b, m, deterministic)
}
func (m *TsoResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_TsoResponse.Merge(m, src)
}
func (m *TsoResponse) XXX_Size() int {
return xxx_messageInfo_TsoResponse.Size(m)
}
func (m *TsoResponse) XXX_DiscardUnknown() {
xxx_messageInfo_TsoResponse.DiscardUnknown(m)
}
var xxx_messageInfo_TsoResponse proto.InternalMessageInfo
func (m *TsoResponse) GetStatus() *commonpb.Status {
if m != nil {
return m.Status
}
return nil
}
func (m *TsoResponse) GetTimestamp() *TimestampMsg {
if m != nil {
return m.Timestamp
}
return nil
}
func (m *TsoResponse) GetCount() uint32 {
if m != nil {
return m.Count
}
return 0
}
type CreateCollectionRequest struct {
ReqType ReqType `protobuf:"varint,1,opt,name=req_type,json=reqType,proto3,enum=milvus.proto.internal.ReqType" json:"req_type,omitempty"`
ReqId uint64 `protobuf:"varint,2,opt,name=req_id,json=reqId,proto3" json:"req_id,omitempty"`
@ -99,7 +287,7 @@ func (m *CreateCollectionRequest) Reset() { *m = CreateCollectionRequest
func (m *CreateCollectionRequest) String() string { return proto.CompactTextString(m) }
func (*CreateCollectionRequest) ProtoMessage() {}
func (*CreateCollectionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{0}
return fileDescriptor_7eb37f6b80b23116, []int{3}
}
func (m *CreateCollectionRequest) XXX_Unmarshal(b []byte) error {
@ -170,7 +358,7 @@ func (m *DropCollectionRequest) Reset() { *m = DropCollectionRequest{} }
func (m *DropCollectionRequest) String() string { return proto.CompactTextString(m) }
func (*DropCollectionRequest) ProtoMessage() {}
func (*DropCollectionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{1}
return fileDescriptor_7eb37f6b80b23116, []int{4}
}
func (m *DropCollectionRequest) XXX_Unmarshal(b []byte) error {
@ -241,7 +429,7 @@ func (m *HasCollectionRequest) Reset() { *m = HasCollectionRequest{} }
func (m *HasCollectionRequest) String() string { return proto.CompactTextString(m) }
func (*HasCollectionRequest) ProtoMessage() {}
func (*HasCollectionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{2}
return fileDescriptor_7eb37f6b80b23116, []int{5}
}
func (m *HasCollectionRequest) XXX_Unmarshal(b []byte) error {
@ -312,7 +500,7 @@ func (m *DescribeCollectionRequest) Reset() { *m = DescribeCollectionReq
func (m *DescribeCollectionRequest) String() string { return proto.CompactTextString(m) }
func (*DescribeCollectionRequest) ProtoMessage() {}
func (*DescribeCollectionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{3}
return fileDescriptor_7eb37f6b80b23116, []int{6}
}
func (m *DescribeCollectionRequest) XXX_Unmarshal(b []byte) error {
@ -382,7 +570,7 @@ func (m *ShowCollectionRequest) Reset() { *m = ShowCollectionRequest{} }
func (m *ShowCollectionRequest) String() string { return proto.CompactTextString(m) }
func (*ShowCollectionRequest) ProtoMessage() {}
func (*ShowCollectionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{4}
return fileDescriptor_7eb37f6b80b23116, []int{7}
}
func (m *ShowCollectionRequest) XXX_Unmarshal(b []byte) error {
@ -446,7 +634,7 @@ func (m *CreatePartitionRequest) Reset() { *m = CreatePartitionRequest{}
func (m *CreatePartitionRequest) String() string { return proto.CompactTextString(m) }
func (*CreatePartitionRequest) ProtoMessage() {}
func (*CreatePartitionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{5}
return fileDescriptor_7eb37f6b80b23116, []int{8}
}
func (m *CreatePartitionRequest) XXX_Unmarshal(b []byte) error {
@ -517,7 +705,7 @@ func (m *DropPartitionRequest) Reset() { *m = DropPartitionRequest{} }
func (m *DropPartitionRequest) String() string { return proto.CompactTextString(m) }
func (*DropPartitionRequest) ProtoMessage() {}
func (*DropPartitionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{6}
return fileDescriptor_7eb37f6b80b23116, []int{9}
}
func (m *DropPartitionRequest) XXX_Unmarshal(b []byte) error {
@ -588,7 +776,7 @@ func (m *HasPartitionRequest) Reset() { *m = HasPartitionRequest{} }
func (m *HasPartitionRequest) String() string { return proto.CompactTextString(m) }
func (*HasPartitionRequest) ProtoMessage() {}
func (*HasPartitionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{7}
return fileDescriptor_7eb37f6b80b23116, []int{10}
}
func (m *HasPartitionRequest) XXX_Unmarshal(b []byte) error {
@ -659,7 +847,7 @@ func (m *DescribePartitionRequest) Reset() { *m = DescribePartitionReque
func (m *DescribePartitionRequest) String() string { return proto.CompactTextString(m) }
func (*DescribePartitionRequest) ProtoMessage() {}
func (*DescribePartitionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{8}
return fileDescriptor_7eb37f6b80b23116, []int{11}
}
func (m *DescribePartitionRequest) XXX_Unmarshal(b []byte) error {
@ -730,7 +918,7 @@ func (m *ShowPartitionRequest) Reset() { *m = ShowPartitionRequest{} }
func (m *ShowPartitionRequest) String() string { return proto.CompactTextString(m) }
func (*ShowPartitionRequest) ProtoMessage() {}
func (*ShowPartitionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{9}
return fileDescriptor_7eb37f6b80b23116, []int{12}
}
func (m *ShowPartitionRequest) XXX_Unmarshal(b []byte) error {
@ -806,7 +994,7 @@ func (m *InsertRequest) Reset() { *m = InsertRequest{} }
func (m *InsertRequest) String() string { return proto.CompactTextString(m) }
func (*InsertRequest) ProtoMessage() {}
func (*InsertRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{10}
return fileDescriptor_7eb37f6b80b23116, []int{13}
}
func (m *InsertRequest) XXX_Unmarshal(b []byte) error {
@ -914,7 +1102,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} }
func (m *DeleteRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteRequest) ProtoMessage() {}
func (*DeleteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{11}
return fileDescriptor_7eb37f6b80b23116, []int{14}
}
func (m *DeleteRequest) XXX_Unmarshal(b []byte) error {
@ -1000,7 +1188,7 @@ func (m *SearchRequest) Reset() { *m = SearchRequest{} }
func (m *SearchRequest) String() string { return proto.CompactTextString(m) }
func (*SearchRequest) ProtoMessage() {}
func (*SearchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{12}
return fileDescriptor_7eb37f6b80b23116, []int{15}
}
func (m *SearchRequest) XXX_Unmarshal(b []byte) error {
@ -1080,7 +1268,7 @@ func (m *SearchResult) Reset() { *m = SearchResult{} }
func (m *SearchResult) String() string { return proto.CompactTextString(m) }
func (*SearchResult) ProtoMessage() {}
func (*SearchResult) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{13}
return fileDescriptor_7eb37f6b80b23116, []int{16}
}
func (m *SearchResult) XXX_Unmarshal(b []byte) error {
@ -1162,7 +1350,7 @@ func (m *TimeSyncMsg) Reset() { *m = TimeSyncMsg{} }
func (m *TimeSyncMsg) String() string { return proto.CompactTextString(m) }
func (*TimeSyncMsg) ProtoMessage() {}
func (*TimeSyncMsg) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{14}
return fileDescriptor_7eb37f6b80b23116, []int{17}
}
func (m *TimeSyncMsg) XXX_Unmarshal(b []byte) error {
@ -1212,7 +1400,7 @@ func (m *Key2Seg) Reset() { *m = Key2Seg{} }
func (m *Key2Seg) String() string { return proto.CompactTextString(m) }
func (*Key2Seg) ProtoMessage() {}
func (*Key2Seg) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{15}
return fileDescriptor_7eb37f6b80b23116, []int{18}
}
func (m *Key2Seg) XXX_Unmarshal(b []byte) error {
@ -1280,7 +1468,7 @@ func (m *Key2SegMsg) Reset() { *m = Key2SegMsg{} }
func (m *Key2SegMsg) String() string { return proto.CompactTextString(m) }
func (*Key2SegMsg) ProtoMessage() {}
func (*Key2SegMsg) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{16}
return fileDescriptor_7eb37f6b80b23116, []int{19}
}
func (m *Key2SegMsg) XXX_Unmarshal(b []byte) error {
@ -1328,7 +1516,7 @@ func (m *SegmentStatistics) Reset() { *m = SegmentStatistics{} }
func (m *SegmentStatistics) String() string { return proto.CompactTextString(m) }
func (*SegmentStatistics) ProtoMessage() {}
func (*SegmentStatistics) Descriptor() ([]byte, []int) {
return fileDescriptor_7eb37f6b80b23116, []int{17}
return fileDescriptor_7eb37f6b80b23116, []int{20}
}
func (m *SegmentStatistics) XXX_Unmarshal(b []byte) error {
@ -1372,6 +1560,10 @@ func (m *SegmentStatistics) GetNumRows() int64 {
func init() {
proto.RegisterEnum("milvus.proto.internal.ReqType", ReqType_name, ReqType_value)
proto.RegisterEnum("milvus.proto.internal.PeerRole", PeerRole_name, PeerRole_value)
proto.RegisterType((*TimestampMsg)(nil), "milvus.proto.internal.TimestampMsg")
proto.RegisterType((*TsoRequest)(nil), "milvus.proto.internal.TsoRequest")
proto.RegisterType((*TsoResponse)(nil), "milvus.proto.internal.TsoResponse")
proto.RegisterType((*CreateCollectionRequest)(nil), "milvus.proto.internal.CreateCollectionRequest")
proto.RegisterType((*DropCollectionRequest)(nil), "milvus.proto.internal.DropCollectionRequest")
proto.RegisterType((*HasCollectionRequest)(nil), "milvus.proto.internal.HasCollectionRequest")
@ -1395,69 +1587,78 @@ func init() {
func init() { proto.RegisterFile("internal_msg.proto", fileDescriptor_7eb37f6b80b23116) }
var fileDescriptor_7eb37f6b80b23116 = []byte{
// 1020 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x57, 0xcd, 0x6f, 0xe3, 0x44,
0x14, 0x67, 0x92, 0x26, 0x69, 0x5f, 0x9a, 0xd6, 0x9d, 0x36, 0xd4, 0x5b, 0x3e, 0x36, 0x78, 0x91,
0xa8, 0x56, 0x22, 0x15, 0x59, 0x0e, 0x70, 0x6d, 0x73, 0x68, 0x58, 0x6d, 0x85, 0x9c, 0x8a, 0x03,
0x12, 0xb2, 0x1c, 0xfb, 0x91, 0x8c, 0xfc, 0xd9, 0x99, 0x49, 0x8b, 0x7b, 0xe7, 0x8a, 0x90, 0x38,
0x72, 0xe3, 0xaf, 0xe1, 0xeb, 0xce, 0x3f, 0x01, 0x82, 0x95, 0x40, 0x5c, 0xd1, 0xd8, 0xce, 0x87,
0xdd, 0x52, 0x10, 0x62, 0xa5, 0x4a, 0xb9, 0xe5, 0xbd, 0x99, 0xbc, 0xf9, 0x7d, 0xcc, 0x3c, 0xcf,
0x00, 0x65, 0xa1, 0x44, 0x1e, 0xda, 0xbe, 0x15, 0x88, 0x71, 0x37, 0xe6, 0x91, 0x8c, 0x68, 0x3b,
0x60, 0xfe, 0xe5, 0x54, 0x64, 0x51, 0x77, 0x36, 0xe1, 0x60, 0xd3, 0x89, 0x82, 0x20, 0x0a, 0xb3,
0xf4, 0xc1, 0x8e, 0x40, 0x7e, 0xc9, 0x1c, 0x5c, 0xfc, 0xcf, 0xf8, 0x89, 0xc0, 0xfe, 0x09, 0x47,
0x5b, 0xe2, 0x49, 0xe4, 0xfb, 0xe8, 0x48, 0x16, 0x85, 0x26, 0x5e, 0x4c, 0x51, 0x48, 0xfa, 0x3e,
0xac, 0x73, 0xbc, 0xb0, 0x64, 0x12, 0xa3, 0x4e, 0x3a, 0xe4, 0x70, 0xab, 0xf7, 0x7a, 0xf7, 0xd6,
0x65, 0xba, 0x26, 0x5e, 0x9c, 0x27, 0x31, 0x9a, 0x0d, 0x9e, 0xfd, 0xa0, 0x6d, 0xa8, 0xab, 0xbf,
0x32, 0x57, 0xaf, 0x74, 0xc8, 0xe1, 0x9a, 0x59, 0xe3, 0x78, 0x31, 0x70, 0xe9, 0xab, 0xb0, 0x21,
0x59, 0x80, 0x42, 0xda, 0x41, 0xac, 0x57, 0xd3, 0x91, 0x45, 0x82, 0x3e, 0x80, 0xf5, 0x98, 0x47,
0x9f, 0x25, 0xea, 0x6f, 0x6b, 0x1d, 0x72, 0x58, 0x35, 0x1b, 0x69, 0x3c, 0x70, 0xe9, 0x3b, 0x50,
0x17, 0xce, 0x04, 0x03, 0x5b, 0xaf, 0x75, 0xc8, 0x61, 0xb3, 0xf7, 0xa0, 0x08, 0x24, 0x67, 0x79,
0xec, 0x47, 0x23, 0x33, 0x9f, 0x68, 0x3c, 0x27, 0xd0, 0xee, 0xf3, 0x28, 0xbe, 0xd7, 0xbc, 0x9e,
0xc1, 0xb6, 0x33, 0xc7, 0x67, 0x85, 0x76, 0x80, 0x39, 0xc1, 0x37, 0x8b, 0x88, 0x72, 0xe3, 0xba,
0x0b, 0x32, 0x67, 0x76, 0x80, 0xe6, 0x96, 0x53, 0x88, 0x8d, 0xdf, 0x08, 0xec, 0x9d, 0xda, 0x62,
0x95, 0x28, 0xff, 0x41, 0xe0, 0x41, 0x1f, 0x85, 0xc3, 0xd9, 0x08, 0x57, 0x89, 0xf7, 0x37, 0x04,
0xda, 0xc3, 0x49, 0x74, 0x75, 0x9f, 0x39, 0x1b, 0xbf, 0x12, 0x78, 0x39, 0xeb, 0x2e, 0x1f, 0xda,
0x5c, 0xb2, 0x7b, 0xea, 0xcc, 0x07, 0xb0, 0x15, 0xcf, 0xe0, 0x2d, 0x1b, 0xf3, 0xe8, 0x76, 0x63,
0xe6, 0x54, 0x52, 0x5f, 0x5a, 0xf1, 0x72, 0x68, 0xfc, 0x42, 0x60, 0x4f, 0x75, 0x9d, 0x55, 0xe1,
0xfb, 0x33, 0x81, 0xdd, 0x53, 0x5b, 0xac, 0x0a, 0xdd, 0xe7, 0x04, 0xf4, 0x59, 0xb7, 0x59, 0x15,
0xce, 0xea, 0xa3, 0xa2, 0x3a, 0xcd, 0x7d, 0xe6, 0xfb, 0x7f, 0x7f, 0x54, 0x2a, 0xd0, 0x1a, 0x84,
0x02, 0xb9, 0x7c, 0x71, 0x5c, 0xdf, 0xba, 0x09, 0x59, 0x31, 0xde, 0x28, 0x83, 0xa1, 0x8f, 0x60,
0x61, 0x88, 0x25, 0xed, 0x71, 0xca, 0x7d, 0xc3, 0xdc, 0x9c, 0x27, 0xcf, 0xed, 0x31, 0x7d, 0x0d,
0x40, 0xe0, 0x38, 0xc0, 0x50, 0xaa, 0x85, 0x6a, 0x99, 0x74, 0x79, 0x66, 0xe0, 0xaa, 0x61, 0x67,
0x62, 0x87, 0x21, 0xfa, 0x6a, 0xb8, 0x9e, 0x0d, 0xe7, 0x99, 0x81, 0x5b, 0x50, 0xb6, 0x51, 0x54,
0xb6, 0x60, 0xc9, 0x7a, 0xd9, 0x92, 0x7d, 0x68, 0xf0, 0xe8, 0xca, 0x62, 0xae, 0xd0, 0x37, 0x3a,
0xd5, 0xc3, 0xaa, 0x59, 0xe7, 0xd1, 0xd5, 0xc0, 0x15, 0xf4, 0x5d, 0x58, 0x57, 0x03, 0xae, 0x2d,
0x6d, 0x1d, 0x3a, 0xd5, 0xbb, 0xaf, 0x6c, 0xaa, 0x46, 0xdf, 0x96, 0xb6, 0xf1, 0x79, 0x05, 0x5a,
0x7d, 0xf4, 0x51, 0xe2, 0x3d, 0xd0, 0xbd, 0xa8, 0xd9, 0xda, 0x5d, 0x9a, 0xd5, 0xee, 0xd0, 0xac,
0x5e, 0xd6, 0xec, 0x0d, 0xd8, 0x8c, 0x39, 0x0b, 0x6c, 0x9e, 0x58, 0x1e, 0x26, 0x42, 0x6f, 0xa4,
0xc2, 0x35, 0xf3, 0xdc, 0x53, 0x4c, 0x84, 0xf1, 0x27, 0x81, 0xd6, 0x10, 0x6d, 0xee, 0x4c, 0x5e,
0x9c, 0x0e, 0xcb, 0xf8, 0xab, 0x77, 0xe0, 0x5f, 0x2b, 0xe3, 0x7f, 0x0c, 0x3b, 0x1c, 0xc5, 0xd4,
0x97, 0xd6, 0x92, 0x3c, 0xd9, 0x8e, 0xdb, 0xce, 0x06, 0x4e, 0xe6, 0x22, 0x1d, 0x41, 0xed, 0x62,
0x8a, 0x3c, 0x49, 0x55, 0xb8, 0x73, 0x0f, 0x64, 0xf3, 0x8c, 0xaf, 0x2a, 0xb0, 0x39, 0x63, 0xae,
0x4a, 0xd1, 0x27, 0x50, 0x17, 0xd2, 0x96, 0x53, 0x91, 0xd2, 0x6e, 0xf6, 0x5e, 0xb9, 0xb5, 0xc4,
0x30, 0x9d, 0x62, 0xe6, 0x53, 0xff, 0x03, 0x65, 0x03, 0x5a, 0x29, 0x00, 0x2b, 0x8c, 0x5c, 0x5c,
0x34, 0x98, 0x66, 0x9a, 0x3c, 0x8b, 0x5c, 0x2c, 0xcb, 0x52, 0xfb, 0x57, 0xb2, 0xd4, 0x6f, 0x97,
0xa5, 0x0b, 0x6b, 0x13, 0x26, 0x33, 0xeb, 0x9b, 0xbd, 0x83, 0xdb, 0x7b, 0xd4, 0x29, 0x93, 0xc2,
0x4c, 0xe7, 0x19, 0x7d, 0x68, 0x9e, 0xb3, 0x00, 0x87, 0x49, 0xe8, 0x3c, 0x13, 0x63, 0x75, 0xea,
0x62, 0x44, 0xae, 0x16, 0x20, 0x29, 0xcc, 0xba, 0x0a, 0xcb, 0x08, 0x2b, 0x25, 0x84, 0xc6, 0xd7,
0x04, 0x1a, 0x4f, 0x31, 0xe9, 0x0d, 0x71, 0x9c, 0x2a, 0x94, 0x1e, 0xdc, 0xbc, 0x42, 0x2d, 0x3d,
0xb7, 0xf4, 0x21, 0x34, 0x97, 0xf6, 0x66, 0x5e, 0x02, 0x16, 0x5b, 0xf3, 0x9f, 0x3b, 0x34, 0x13,
0xd6, 0xa5, 0xed, 0xe7, 0x02, 0xae, 0x9b, 0x0d, 0x26, 0x3e, 0x52, 0xa1, 0xaa, 0xbc, 0x68, 0x50,
0x42, 0xaf, 0x75, 0xaa, 0xaa, 0xf2, 0xbc, 0x43, 0x09, 0xe3, 0x13, 0x80, 0x1c, 0x9c, 0xa2, 0xb8,
0x70, 0x90, 0x2c, 0x3b, 0xf8, 0x1e, 0x34, 0x3c, 0x4c, 0x7a, 0x02, 0xc7, 0x7a, 0x25, 0xd5, 0xee,
0xef, 0x4e, 0x41, 0x5e, 0xca, 0x9c, 0x4d, 0x37, 0x42, 0xd8, 0x19, 0x66, 0x8b, 0xa9, 0xbd, 0xc2,
0x84, 0x64, 0x8e, 0x28, 0x75, 0x4d, 0x52, 0xee, 0x9a, 0x0f, 0xa1, 0x19, 0x60, 0x10, 0xf1, 0xc4,
0x12, 0xec, 0x1a, 0x67, 0x6a, 0x64, 0xa9, 0x21, 0xbb, 0x46, 0xc5, 0x37, 0x9c, 0x06, 0x16, 0x8f,
0xae, 0xc4, 0x6c, 0x43, 0x85, 0xd3, 0xc0, 0x8c, 0xae, 0xc4, 0xe3, 0x2f, 0x2a, 0xd0, 0xc8, 0x8f,
0x22, 0xdd, 0x80, 0x9a, 0x77, 0x16, 0x85, 0xa8, 0xbd, 0x44, 0xdb, 0xb0, 0xe3, 0x95, 0xdf, 0xdb,
0x9a, 0x4b, 0x77, 0x61, 0xdb, 0x2b, 0x3e, 0x56, 0x35, 0xa4, 0x14, 0xb6, 0xbc, 0xc2, 0x6b, 0x4e,
0xfb, 0x94, 0xee, 0xc3, 0xae, 0x77, 0xf3, 0xb9, 0xa3, 0x8d, 0xe9, 0x1e, 0x68, 0x5e, 0xf1, 0x3d,
0x20, 0xb4, 0x09, 0x6d, 0x83, 0xe6, 0x95, 0x2e, 0xe0, 0xda, 0xb7, 0x84, 0xee, 0xc2, 0x96, 0x57,
0xb8, 0xa5, 0x6a, 0xdf, 0x11, 0x4a, 0xa1, 0xe5, 0x2d, 0x5f, 0xe5, 0xb4, 0xef, 0x09, 0xdd, 0x07,
0xea, 0xdd, 0xb8, 0xef, 0x68, 0x3f, 0x10, 0xba, 0x07, 0xdb, 0x5e, 0xe1, 0x52, 0x20, 0xb4, 0x1f,
0x09, 0xdd, 0x84, 0x86, 0x97, 0x7d, 0x37, 0xb5, 0x2f, 0xab, 0x69, 0x94, 0x9d, 0x65, 0xed, 0xf7,
0xea, 0x71, 0xff, 0xe3, 0xe3, 0x31, 0x93, 0x93, 0xe9, 0x48, 0x9d, 0xd9, 0xa3, 0x6b, 0xe6, 0xfb,
0xec, 0x5a, 0xa2, 0x33, 0x39, 0xca, 0x0c, 0x7c, 0xdb, 0x65, 0x42, 0x72, 0x36, 0x9a, 0x4a, 0x74,
0x8f, 0x66, 0x36, 0x1e, 0xa5, 0xae, 0xce, 0xc3, 0x78, 0x34, 0xaa, 0xa7, 0x99, 0x27, 0x7f, 0x05,
0x00, 0x00, 0xff, 0xff, 0xf3, 0x38, 0x9d, 0x4f, 0x04, 0x11, 0x00, 0x00,
// 1163 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x58, 0xcd, 0x6f, 0x1b, 0x45,
0x14, 0xef, 0xda, 0xf1, 0x47, 0x9e, 0xe3, 0x64, 0x33, 0x89, 0x89, 0x1b, 0x0a, 0x0d, 0x5b, 0x24,
0xaa, 0x4a, 0x38, 0xc2, 0xe5, 0x40, 0x8f, 0xb4, 0x3e, 0xd4, 0x54, 0xad, 0xaa, 0x75, 0x04, 0x12,
0x12, 0x5a, 0xad, 0x77, 0x1f, 0xf6, 0x68, 0x3f, 0x66, 0x33, 0x33, 0x6e, 0xd8, 0xdc, 0xb9, 0x22,
0x24, 0x8e, 0xdc, 0xf8, 0x6b, 0xf8, 0xba, 0xf3, 0x4f, 0x80, 0xa0, 0x12, 0x88, 0x2b, 0x9a, 0xd9,
0x5d, 0xdb, 0xeb, 0x26, 0xe1, 0x43, 0x54, 0x8a, 0x94, 0x9b, 0xdf, 0x9b, 0x99, 0x37, 0xbf, 0xdf,
0xef, 0xcd, 0xbc, 0x9d, 0x67, 0x20, 0x34, 0x96, 0xc8, 0x63, 0x37, 0x74, 0x22, 0x31, 0xe9, 0x25,
0x9c, 0x49, 0x46, 0x3a, 0x11, 0x0d, 0x9f, 0xcd, 0x44, 0x66, 0xf5, 0x8a, 0x09, 0xfb, 0x1b, 0x1e,
0x8b, 0x22, 0x16, 0x67, 0xee, 0xfd, 0x6d, 0x81, 0xfc, 0x19, 0xf5, 0x70, 0xb1, 0xce, 0x1a, 0xc0,
0xc6, 0x11, 0x8d, 0x50, 0x48, 0x37, 0x4a, 0x1e, 0x8b, 0x09, 0xd9, 0x87, 0x66, 0x32, 0x4d, 0x05,
0xf5, 0xdc, 0xb0, 0x6b, 0x1c, 0x18, 0xb7, 0xab, 0xf6, 0xdc, 0x26, 0x5d, 0x68, 0x84, 0x6c, 0xa2,
0x87, 0x2a, 0x7a, 0xa8, 0x30, 0xad, 0x04, 0xe0, 0x48, 0x30, 0x1b, 0x8f, 0x67, 0x28, 0x24, 0xd9,
0x83, 0x46, 0x82, 0xc8, 0x1d, 0xea, 0xe7, 0x21, 0xea, 0xca, 0x1c, 0xfa, 0xe4, 0x2e, 0xac, 0x71,
0x16, 0xa2, 0x5e, 0xbd, 0xd9, 0xbf, 0xd9, 0x3b, 0x13, 0x73, 0xef, 0x29, 0x22, 0xb7, 0x59, 0x88,
0xb6, 0x9e, 0x4c, 0x76, 0xa1, 0xe6, 0xb1, 0x59, 0x2c, 0xbb, 0xd5, 0x03, 0xe3, 0x76, 0xdb, 0xce,
0x0c, 0xeb, 0x6b, 0x03, 0x5a, 0x7a, 0x4b, 0x91, 0xb0, 0x58, 0x20, 0xb9, 0x0b, 0x75, 0x21, 0x5d,
0x39, 0x13, 0x7a, 0xcb, 0x56, 0xff, 0xd5, 0x72, 0xf0, 0x5c, 0x86, 0x91, 0x9e, 0x62, 0xe7, 0x53,
0xc9, 0xfb, 0xb0, 0x2e, 0x0b, 0xf2, 0x1a, 0x54, 0xab, 0x7f, 0xeb, 0x1c, 0x50, 0xcb, 0x22, 0xd9,
0x8b, 0x55, 0xe7, 0xa0, 0xfb, 0xc9, 0x80, 0xbd, 0x07, 0x1c, 0x5d, 0x89, 0x0f, 0x58, 0x18, 0xa2,
0x27, 0x29, 0x8b, 0x0b, 0x75, 0xee, 0x41, 0x93, 0xe3, 0xb1, 0x23, 0xd3, 0x04, 0x35, 0xd6, 0xcd,
0xfe, 0xeb, 0xe7, 0xec, 0x69, 0xe3, 0xf1, 0x51, 0x9a, 0xa0, 0xdd, 0xe0, 0xd9, 0x0f, 0xd2, 0x81,
0xba, 0x5a, 0x4a, 0x7d, 0x0d, 0x76, 0xcd, 0xae, 0x71, 0x3c, 0x1e, 0xfa, 0xe4, 0xc6, 0x32, 0x8d,
0xaa, 0x1e, 0x59, 0x42, 0x78, 0x1d, 0x9a, 0x09, 0x67, 0x9f, 0xa5, 0x6a, 0xd9, 0x5a, 0x96, 0x36,
0x6d, 0x0f, 0x7d, 0xf2, 0x0e, 0xd4, 0x85, 0x37, 0xc5, 0xc8, 0xed, 0xd6, 0x34, 0xf9, 0xeb, 0x67,
0x8a, 0x76, 0x3f, 0x64, 0x63, 0x3b, 0x9f, 0x68, 0x3d, 0x37, 0xa0, 0x33, 0xe0, 0x2c, 0xb9, 0xd4,
0xbc, 0x1e, 0xc3, 0x96, 0x37, 0xc7, 0xe7, 0xc4, 0x6e, 0x84, 0x39, 0xc1, 0x37, 0xcb, 0x88, 0xf2,
0xeb, 0xd0, 0x5b, 0x90, 0x79, 0xe2, 0x46, 0x68, 0x6f, 0x7a, 0x25, 0xdb, 0xfa, 0xcd, 0x80, 0xdd,
0x87, 0xae, 0xb8, 0x4a, 0x94, 0xff, 0x30, 0xe0, 0xfa, 0x00, 0x85, 0xc7, 0xe9, 0x18, 0xaf, 0x12,
0xef, 0x6f, 0x0c, 0xe8, 0x8c, 0xa6, 0xec, 0xe4, 0x32, 0x73, 0xb6, 0x7e, 0x35, 0xe0, 0x95, 0xac,
0xba, 0x3c, 0x75, 0xb9, 0xa4, 0x97, 0x34, 0x33, 0x1f, 0xc0, 0x66, 0x52, 0xc0, 0x5b, 0x4e, 0xcc,
0xad, 0xb3, 0x13, 0x33, 0xa7, 0xa2, 0xf3, 0xd2, 0x4e, 0x96, 0x4d, 0xeb, 0x17, 0x03, 0x76, 0x55,
0xd5, 0xb9, 0x2a, 0x7c, 0x7f, 0x36, 0x60, 0xe7, 0xa1, 0x2b, 0xae, 0x0a, 0xdd, 0xe7, 0x06, 0x74,
0x8b, 0x6a, 0x73, 0x55, 0x38, 0xab, 0x8f, 0x8a, 0xaa, 0x34, 0x97, 0x99, 0xef, 0xff, 0xfd, 0x51,
0xa9, 0x40, 0x7b, 0x18, 0x0b, 0xe4, 0xf2, 0xe5, 0x71, 0x7d, 0xeb, 0x45, 0xc8, 0x8a, 0xf1, 0xfa,
0x2a, 0x18, 0x72, 0x0b, 0x16, 0x09, 0x71, 0xa4, 0x3b, 0xd1, 0xdc, 0xd7, 0xed, 0x8d, 0xb9, 0xf3,
0xc8, 0x9d, 0x90, 0xd7, 0x00, 0x04, 0x4e, 0x22, 0x8c, 0xa5, 0xda, 0xa8, 0x96, 0x49, 0x97, 0x7b,
0x86, 0xbe, 0x1a, 0xf6, 0xa6, 0x6e, 0x1c, 0x63, 0xa8, 0x86, 0xeb, 0xd9, 0x70, 0xee, 0x19, 0xfa,
0x25, 0x65, 0x1b, 0x65, 0x65, 0x4b, 0x29, 0x69, 0xae, 0xa6, 0x64, 0x0f, 0x1a, 0x9c, 0x9d, 0x38,
0xd4, 0x17, 0xdd, 0xf5, 0x83, 0xaa, 0x7a, 0x40, 0x73, 0x76, 0x32, 0xf4, 0x05, 0x79, 0x17, 0x9a,
0x6a, 0xc0, 0x77, 0xa5, 0xdb, 0x85, 0x83, 0xea, 0xc5, 0x4f, 0x36, 0x15, 0x63, 0xe0, 0x4a, 0xd7,
0xfa, 0xbc, 0x02, 0xed, 0x01, 0x86, 0x28, 0xf1, 0x12, 0xe8, 0x5e, 0xd6, 0x6c, 0xed, 0x22, 0xcd,
0x6a, 0x17, 0x68, 0x56, 0x5f, 0xd5, 0xec, 0x0d, 0xd8, 0x48, 0x38, 0x8d, 0x5c, 0x9e, 0x3a, 0x01,
0xa6, 0xa2, 0xdb, 0xd0, 0xc2, 0xb5, 0x72, 0xdf, 0x23, 0x4c, 0x85, 0xf5, 0xa7, 0x01, 0xed, 0x11,
0xba, 0xdc, 0x9b, 0xbe, 0x3c, 0x1d, 0x96, 0xf1, 0x57, 0x2f, 0xc0, 0xbf, 0xb6, 0x8a, 0xff, 0x0e,
0x6c, 0x73, 0x14, 0xb3, 0x50, 0x3a, 0x4b, 0xf2, 0x64, 0x27, 0x6e, 0x2b, 0x1b, 0x78, 0x30, 0x17,
0xe9, 0x10, 0x6a, 0xc7, 0x33, 0xe4, 0xa9, 0x56, 0xe1, 0xc2, 0x33, 0x90, 0xcd, 0xb3, 0xbe, 0xaa,
0xc0, 0x46, 0xc1, 0x5c, 0x85, 0xfa, 0x6f, 0xed, 0xd2, 0xbf, 0xa7, 0x6c, 0x41, 0x5b, 0x03, 0x70,
0x62, 0xe6, 0xe3, 0xa2, 0xc0, 0xb4, 0xb4, 0xf3, 0x09, 0xf3, 0x71, 0x55, 0x96, 0xda, 0x3f, 0x92,
0xa5, 0x7e, 0xb6, 0x2c, 0x3d, 0x58, 0x9b, 0x52, 0x99, 0xa5, 0xbe, 0xd5, 0xdf, 0x3f, 0xbb, 0x46,
0x3d, 0xa4, 0x52, 0xd8, 0x7a, 0x9e, 0x35, 0x80, 0x96, 0x6a, 0xeb, 0x46, 0x69, 0xec, 0xa9, 0xd6,
0xf7, 0xdc, 0xb6, 0xf5, 0xc6, 0x6a, 0x9b, 0xb8, 0x8c, 0x50, 0x75, 0xa2, 0x8d, 0x47, 0x98, 0xf6,
0x47, 0x38, 0xd1, 0x0a, 0xe9, 0x8b, 0x9b, 0x47, 0xa8, 0xe9, 0x7b, 0x4b, 0x6e, 0x42, 0x6b, 0xe9,
0x6c, 0xe6, 0x21, 0x60, 0x71, 0x34, 0xff, 0xbe, 0x42, 0x53, 0xe1, 0x3c, 0x73, 0xc3, 0x5c, 0xc0,
0xa6, 0xdd, 0xa0, 0xe2, 0x43, 0x65, 0xaa, 0xc8, 0x8b, 0x02, 0x25, 0xba, 0xb5, 0x83, 0xaa, 0x8a,
0x3c, 0xaf, 0x50, 0xc2, 0xfa, 0x04, 0x20, 0x07, 0xa7, 0x28, 0x2e, 0x32, 0x68, 0x2c, 0x67, 0xf0,
0x3d, 0x68, 0x04, 0x98, 0xf6, 0x05, 0x4e, 0xba, 0x15, 0xad, 0xdd, 0x79, 0xb7, 0x20, 0x0f, 0x65,
0x17, 0xd3, 0xad, 0x18, 0xb6, 0x47, 0xd9, 0x66, 0xea, 0xac, 0x50, 0x21, 0xa9, 0x27, 0x56, 0xaa,
0xa6, 0xb1, 0x5a, 0x35, 0x6f, 0x42, 0x2b, 0xc2, 0x88, 0xf1, 0xd4, 0x11, 0xf4, 0x14, 0x0b, 0x35,
0x32, 0xd7, 0x88, 0x9e, 0xa2, 0xe2, 0x1b, 0xcf, 0x22, 0x87, 0xb3, 0x13, 0x51, 0x1c, 0xa8, 0x78,
0x16, 0xd9, 0xec, 0x44, 0xdc, 0xf9, 0xa2, 0x02, 0x8d, 0xfc, 0x2a, 0x92, 0x75, 0xa8, 0x05, 0x4f,
0x58, 0x8c, 0xe6, 0x35, 0xd2, 0x81, 0xed, 0x60, 0xb5, 0xdf, 0x36, 0x7d, 0xb2, 0x03, 0x5b, 0x41,
0xb9, 0x59, 0x35, 0x91, 0x10, 0xd8, 0x0c, 0x4a, 0xdd, 0x9c, 0xf9, 0x29, 0xd9, 0x83, 0x9d, 0xe0,
0xc5, 0x76, 0xc7, 0x9c, 0x90, 0x5d, 0x30, 0x83, 0x72, 0x3f, 0x20, 0xcc, 0x29, 0xe9, 0x80, 0x19,
0xac, 0x3c, 0xc0, 0xcd, 0x6f, 0x0d, 0xb2, 0x03, 0x9b, 0x41, 0xe9, 0x95, 0x6a, 0x7e, 0x67, 0x10,
0x02, 0xed, 0x60, 0xf9, 0x29, 0x67, 0x7e, 0x6f, 0x90, 0x3d, 0x20, 0xc1, 0x0b, 0xef, 0x1d, 0xf3,
0x07, 0x83, 0xec, 0xc2, 0x56, 0x50, 0x7a, 0x14, 0x08, 0xf3, 0x47, 0x83, 0x6c, 0x40, 0x23, 0xc8,
0xbe, 0x9b, 0xe6, 0x97, 0x55, 0x6d, 0x65, 0x77, 0xd9, 0xfc, 0xbd, 0x7a, 0xe7, 0x1e, 0x34, 0x8b,
0xff, 0x4b, 0x08, 0x40, 0xfd, 0xb1, 0x2b, 0x24, 0x72, 0xf3, 0x9a, 0xfa, 0x6d, 0xa3, 0xeb, 0x23,
0x37, 0x0d, 0xf5, 0xfb, 0x23, 0x4e, 0x95, 0xbf, 0xa2, 0x44, 0x7b, 0xaa, 0x2e, 0xa7, 0x59, 0xbd,
0x3f, 0xf8, 0xf8, 0xfe, 0x84, 0xca, 0xe9, 0x6c, 0xac, 0xae, 0xfb, 0xe1, 0x29, 0x0d, 0x43, 0x7a,
0x2a, 0xd1, 0x9b, 0x1e, 0x66, 0xb9, 0x7f, 0xdb, 0xa7, 0x42, 0x72, 0x3a, 0x9e, 0x49, 0xf4, 0x0f,
0x8b, 0x13, 0x70, 0xa8, 0x0f, 0xc4, 0xdc, 0x4c, 0xc6, 0xe3, 0xba, 0xf6, 0xdc, 0xfd, 0x2b, 0x00,
0x00, 0xff, 0xff, 0xe0, 0x4c, 0xc7, 0x32, 0x95, 0x12, 0x00, 0x00,
}

View File

@ -87,4 +87,7 @@ service Master {
* @return StringListResponse
*/
rpc ShowPartitions(internal.ShowPartitionRequest) returns (service.StringListResponse) {}
}
rpc Tso(stream internal.TsoRequest) returns (stream internal.TsoResponse) {}
}

View File

@ -30,31 +30,33 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
func init() { proto.RegisterFile("master.proto", fileDescriptor_f9c348dec43a6705) }
var fileDescriptor_f9c348dec43a6705 = []byte{
// 384 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x4d, 0x4f, 0xc2, 0x40,
0x10, 0xe5, 0x44, 0xcc, 0x86, 0x0f, 0x59, 0x6f, 0x78, 0xeb, 0xc9, 0x80, 0xb4, 0x46, 0xff, 0x80,
0x01, 0x0e, 0x1c, 0x34, 0x31, 0x70, 0xd3, 0x18, 0xdc, 0x96, 0x0d, 0x4c, 0x6c, 0xbb, 0x75, 0x67,
0x8a, 0x09, 0xff, 0xc2, 0x7f, 0x6c, 0xda, 0x52, 0xda, 0x15, 0x8a, 0xe8, 0xad, 0x3b, 0xf3, 0xf6,
0xbd, 0xce, 0xbc, 0x97, 0x65, 0x8d, 0x40, 0x20, 0x49, 0x6d, 0x47, 0x5a, 0x91, 0xe2, 0x17, 0x01,
0xf8, 0xeb, 0x18, 0xb3, 0x93, 0x9d, 0xb5, 0xba, 0x0d, 0x4f, 0x05, 0x81, 0x0a, 0xb3, 0x62, 0x97,
0x43, 0x48, 0x52, 0x87, 0xc2, 0x9f, 0x07, 0xb8, 0xdc, 0xd6, 0x3a, 0x28, 0xf5, 0x1a, 0x3c, 0x59,
0x94, 0x6e, 0xbf, 0xce, 0x58, 0xfd, 0x31, 0xbd, 0xcf, 0x05, 0x3b, 0x1f, 0x69, 0x29, 0x48, 0x8e,
0x94, 0xef, 0x4b, 0x8f, 0x40, 0x85, 0xdc, 0xb6, 0x0d, 0xa5, 0x9c, 0xd3, 0xfe, 0x09, 0x9c, 0xca,
0x8f, 0x58, 0x22, 0x75, 0x2f, 0x4d, 0xfc, 0xf6, 0x8f, 0x66, 0x24, 0x28, 0x46, 0xab, 0xc6, 0x5f,
0x59, 0x6b, 0xac, 0x55, 0x54, 0x12, 0xb8, 0xae, 0x10, 0x30, 0x61, 0x27, 0xd2, 0xbb, 0xac, 0x39,
0x11, 0x58, 0x62, 0xef, 0x57, 0xb0, 0x1b, 0xa8, 0x9c, 0xdc, 0x32, 0xc1, 0xdb, 0x5d, 0xd9, 0x43,
0xa5, 0xfc, 0xa9, 0xc4, 0x48, 0x85, 0x28, 0xad, 0x1a, 0x8f, 0x19, 0x1f, 0x4b, 0xf4, 0x34, 0xb8,
0xe5, 0x3d, 0xdd, 0x54, 0x8d, 0xb1, 0x07, 0xcd, 0xd5, 0xfa, 0x87, 0xd5, 0x0a, 0x60, 0x76, 0x35,
0x4a, 0x3e, 0xad, 0x1a, 0x7f, 0x67, 0xed, 0xd9, 0x4a, 0x7d, 0x16, 0x6d, 0xac, 0x5c, 0x9d, 0x89,
0xcb, 0xf5, 0xae, 0x0e, 0xeb, 0xcd, 0x48, 0x43, 0xb8, 0x7c, 0x00, 0xa4, 0xd2, 0x8c, 0x73, 0xd6,
0xce, 0x0c, 0x7e, 0x12, 0x9a, 0x20, 0x1d, 0x70, 0x70, 0x34, 0x08, 0x3b, 0xdc, 0x89, 0x46, 0xbd,
0xb0, 0x66, 0x62, 0x70, 0x41, 0xdf, 0x3f, 0x12, 0x83, 0xbf, 0x92, 0xbf, 0xb1, 0xc6, 0x44, 0x60,
0xc1, 0xdd, 0xab, 0x0e, 0xc1, 0x1e, 0xf5, 0x69, 0x19, 0xd0, 0xac, 0x93, 0x1b, 0x5b, 0xc8, 0x38,
0xbf, 0x44, 0x60, 0x4f, 0xab, 0x77, 0x58, 0x6b, 0x87, 0x33, 0x03, 0x00, 0xac, 0x95, 0x18, 0xbb,
0xeb, 0x62, 0xe5, 0xce, 0x0c, 0xd8, 0x3f, 0xec, 0x1f, 0x0e, 0x9f, 0xef, 0x97, 0x40, 0xab, 0xd8,
0x4d, 0x56, 0xeb, 0x6c, 0xc0, 0xf7, 0x61, 0x43, 0xd2, 0x5b, 0x39, 0x19, 0xc5, 0x60, 0x01, 0x48,
0x1a, 0xdc, 0x98, 0xe4, 0xc2, 0xc9, 0x55, 0x9d, 0x94, 0xd7, 0xc9, 0x9e, 0xa2, 0xc8, 0x75, 0xeb,
0xe9, 0xf9, 0xee, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x96, 0xb6, 0xf6, 0x5a, 0xb8, 0x04, 0x00, 0x00,
// 409 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xcd, 0x6e, 0xe2, 0x30,
0x14, 0x85, 0x41, 0x23, 0xa1, 0x91, 0xc5, 0xcf, 0xe0, 0xd9, 0x31, 0xab, 0xc9, 0x0a, 0xc1, 0x90,
0xa0, 0xe9, 0x0b, 0x54, 0xc0, 0x82, 0x45, 0x2b, 0x55, 0xc0, 0xaa, 0x55, 0x45, 0x9d, 0x60, 0x81,
0xd5, 0x24, 0x4e, 0x7d, 0x6f, 0xa8, 0xc4, 0xfb, 0xf6, 0x3d, 0xaa, 0x24, 0x84, 0xc4, 0x05, 0x53,
0xda, 0x1d, 0xb6, 0x8f, 0xbf, 0x83, 0xef, 0x39, 0x0a, 0xa9, 0x07, 0x0c, 0x90, 0x2b, 0x3b, 0x52,
0x12, 0x25, 0xfd, 0x1d, 0x08, 0x7f, 0x1b, 0x43, 0xb6, 0xb2, 0xb3, 0xa3, 0x4e, 0xdd, 0x93, 0x41,
0x20, 0xc3, 0x6c, 0xb3, 0x43, 0x45, 0x88, 0x5c, 0x85, 0xcc, 0x5f, 0x06, 0xb0, 0xde, 0xef, 0xb5,
0x81, 0xab, 0xad, 0xf0, 0x78, 0xb1, 0xf5, 0xff, 0xed, 0x27, 0xa9, 0xdd, 0xa6, 0xf7, 0x29, 0x23,
0xbf, 0xc6, 0x8a, 0x33, 0xe4, 0x63, 0xe9, 0xfb, 0xdc, 0x43, 0x21, 0x43, 0x6a, 0xdb, 0x9a, 0x53,
0xce, 0xb4, 0x3f, 0x0a, 0x67, 0xfc, 0x25, 0xe6, 0x80, 0x9d, 0x3f, 0xba, 0x7e, 0xff, 0x8f, 0xe6,
0xc8, 0x30, 0x06, 0xab, 0x42, 0x1f, 0x49, 0x73, 0xa2, 0x64, 0x54, 0x32, 0xf8, 0x67, 0x30, 0xd0,
0x65, 0x17, 0xe2, 0x5d, 0xd2, 0x98, 0x32, 0x28, 0xd1, 0xfb, 0x06, 0xba, 0xa6, 0xca, 0xe1, 0x96,
0x2e, 0xde, 0xcf, 0xca, 0x1e, 0x49, 0xe9, 0xcf, 0x38, 0x44, 0x32, 0x04, 0x6e, 0x55, 0x68, 0x4c,
0xe8, 0x84, 0x83, 0xa7, 0x84, 0x5b, 0x9e, 0xd3, 0xd0, 0xf4, 0x8c, 0x23, 0x69, 0xee, 0xd6, 0x3f,
0xed, 0x56, 0x08, 0xb3, 0xab, 0x51, 0xf2, 0xd3, 0xaa, 0xd0, 0x67, 0xd2, 0x9a, 0x6f, 0xe4, 0x6b,
0x71, 0x0c, 0xc6, 0xd1, 0xe9, 0xba, 0xdc, 0xaf, 0x7b, 0xda, 0x6f, 0x8e, 0x4a, 0x84, 0xeb, 0x1b,
0x01, 0x58, 0x7a, 0xe3, 0x92, 0xb4, 0xb2, 0x80, 0xef, 0x98, 0x42, 0x91, 0x3e, 0x70, 0x70, 0xb6,
0x08, 0x07, 0xdd, 0x85, 0x41, 0x3d, 0x90, 0x46, 0x12, 0x70, 0x81, 0xef, 0x9f, 0xa9, 0xc1, 0x57,
0xe1, 0x4f, 0xa4, 0x3e, 0x65, 0x50, 0xb0, 0x7b, 0xe6, 0x12, 0x1c, 0xa1, 0x2f, 0xeb, 0x80, 0x22,
0xed, 0x3c, 0xd8, 0xc2, 0xc6, 0xf9, 0xa4, 0x02, 0x47, 0x5e, 0xbd, 0xd3, 0x5e, 0x07, 0x9d, 0x5e,
0x00, 0x41, 0x9a, 0x49, 0xb0, 0x87, 0x53, 0x30, 0xce, 0x4c, 0x93, 0x7d, 0x27, 0xfe, 0x19, 0xf9,
0xb1, 0x00, 0x49, 0xff, 0x1a, 0xf8, 0x0b, 0x90, 0x86, 0x71, 0xe9, 0x92, 0x9c, 0xd7, 0xad, 0x0e,
0xab, 0xa3, 0xd1, 0xfd, 0xf5, 0x5a, 0xe0, 0x26, 0x76, 0x93, 0xb8, 0x9c, 0x9d, 0xf0, 0x7d, 0xb1,
0x43, 0xee, 0x6d, 0x9c, 0x0c, 0x30, 0x58, 0x09, 0x40, 0x25, 0xdc, 0x18, 0xf9, 0xca, 0xc9, 0x31,
0x4e, 0x4a, 0x75, 0xb2, 0xcf, 0x5b, 0xe4, 0xba, 0xb5, 0x74, 0x7d, 0xf5, 0x1e, 0x00, 0x00, 0xff,
0xff, 0x47, 0x62, 0x7a, 0xee, 0x0c, 0x05, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -129,6 +131,7 @@ type MasterClient interface {
//
// @return StringListResponse
ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest, opts ...grpc.CallOption) (*servicepb.StringListResponse, error)
Tso(ctx context.Context, opts ...grpc.CallOption) (Master_TsoClient, error)
}
type masterClient struct {
@ -229,6 +232,37 @@ func (c *masterClient) ShowPartitions(ctx context.Context, in *internalpb.ShowPa
return out, nil
}
func (c *masterClient) Tso(ctx context.Context, opts ...grpc.CallOption) (Master_TsoClient, error) {
stream, err := c.cc.NewStream(ctx, &_Master_serviceDesc.Streams[0], "/milvus.proto.master.Master/Tso", opts...)
if err != nil {
return nil, err
}
x := &masterTsoClient{stream}
return x, nil
}
type Master_TsoClient interface {
Send(*internalpb.TsoRequest) error
Recv() (*internalpb.TsoResponse, error)
grpc.ClientStream
}
type masterTsoClient struct {
grpc.ClientStream
}
func (x *masterTsoClient) Send(m *internalpb.TsoRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *masterTsoClient) Recv() (*internalpb.TsoResponse, error) {
m := new(internalpb.TsoResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// MasterServer is the server API for Master service.
type MasterServer interface {
//*
@ -291,6 +325,7 @@ type MasterServer interface {
//
// @return StringListResponse
ShowPartitions(context.Context, *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error)
Tso(Master_TsoServer) error
}
// UnimplementedMasterServer can be embedded to have forward compatible implementations.
@ -327,6 +362,9 @@ func (*UnimplementedMasterServer) DescribePartition(ctx context.Context, req *in
func (*UnimplementedMasterServer) ShowPartitions(ctx context.Context, req *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ShowPartitions not implemented")
}
func (*UnimplementedMasterServer) Tso(srv Master_TsoServer) error {
return status.Errorf(codes.Unimplemented, "method Tso not implemented")
}
func RegisterMasterServer(s *grpc.Server, srv MasterServer) {
s.RegisterService(&_Master_serviceDesc, srv)
@ -512,6 +550,32 @@ func _Master_ShowPartitions_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _Master_Tso_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(MasterServer).Tso(&masterTsoServer{stream})
}
type Master_TsoServer interface {
Send(*internalpb.TsoResponse) error
Recv() (*internalpb.TsoRequest, error)
grpc.ServerStream
}
type masterTsoServer struct {
grpc.ServerStream
}
func (x *masterTsoServer) Send(m *internalpb.TsoResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *masterTsoServer) Recv() (*internalpb.TsoRequest, error) {
m := new(internalpb.TsoRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Master_serviceDesc = grpc.ServiceDesc{
ServiceName: "milvus.proto.master.Master",
HandlerType: (*MasterServer)(nil),
@ -557,6 +621,13 @@ var _Master_serviceDesc = grpc.ServiceDesc{
Handler: _Master_ShowPartitions_Handler,
},
},
Streams: []grpc.StreamDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Tso",
Handler: _Master_Tso_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "master.proto",
}

View File

@ -0,0 +1,41 @@
// Copyright 2019 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tsoutil
import (
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
const (
physicalShiftBits = 18
logicalBits = (1 << physicalShiftBits) - 1
)
// ParseTS parses the ts to (physical,logical).
func ParseTS(ts uint64) (time.Time, uint64) {
logical := ts & logicalBits
physical := ts >> physicalShiftBits
physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds())
return physicalTime, logical
}
// ParseTimestamp parses pdpb.Timestamp to time.Time
func ParseTimestamp(ts internalpb.TimestampMsg) (time.Time, uint64) {
logical := uint64(ts.Logical)
physical := ts.Physical
physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds())
return physicalTime, logical
}

View File

@ -0,0 +1,35 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package typeutil
import (
"encoding/binary"
"github.com/zilliztech/milvus-distributed/internal/errors"
)
// BytesToUint64 converts a byte slice to uint64.
func BytesToUint64(b []byte) (uint64, error) {
if len(b) != 8 {
return 0, errors.Errorf("invalid data, must 8 bytes, but %d", len(b))
}
return binary.BigEndian.Uint64(b), nil
}
// Uint64ToBytes converts uint64 to a byte slice.
func Uint64ToBytes(v uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, v)
return b
}

View File

@ -0,0 +1,34 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package typeutil
import "time"
// ZeroTime is a zero time.
var ZeroTime = time.Time{}
// ParseTimestamp returns a timestamp for a given byte slice.
func ParseTimestamp(data []byte) (time.Time, error) {
nano, err := BytesToUint64(data)
if err != nil {
return ZeroTime, err
}
return time.Unix(0, int64(nano)), nil
}
// SubTimeByWallClock returns the duration between two different timestamps.
func SubTimeByWallClock(after time.Time, before time.Time) time.Duration {
return time.Duration(after.UnixNano() - before.UnixNano())
}