Add data service

Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
sunby 2021-01-19 12:10:49 +08:00 committed by yefu.chen
parent 6a13386393
commit 76a7684d96
21 changed files with 698 additions and 372 deletions

View File

@ -176,7 +176,7 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
if assign == nil || assign.Capacity(segRequest.timestamp) < records[colName][partitionName][channelID] {
partitionID, _ := typeutil.Hash32String(segRequest.colName)
sa.segReqs = append(sa.segReqs, &datapb.SegIDRequest{
ChannelID: strconv.FormatUint(uint64(segRequest.channelID), 10),
ChannelName: strconv.FormatUint(uint64(segRequest.channelID), 10),
Count: segRequest.count,
CollName: segRequest.colName,
PartitionName: segRequest.partitionName,
@ -220,7 +220,7 @@ func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegIDRequest) bool
if req1 == req2 {
return true
}
return req1.CollName == req2.CollName && req1.PartitionName == req2.PartitionName && req1.ChannelID == req2.ChannelID
return req1.CollName == req2.CollName && req1.PartitionName == req2.PartitionName && req1.ChannelName == req2.ChannelName
}
func (sa *SegIDAssigner) reduceSegReqs() {
@ -281,7 +281,12 @@ func (sa *SegIDAssigner) syncSegments() bool {
log.Println("SyncSegment Error:", info.Status.Reason)
continue
}
assign := sa.getAssign(info.CollName, info.PartitionName, info.ChannelID)
// FIXME: use channelName
channel, err := strconv.Atoi(info.ChannelName)
if err != nil {
return false
}
assign := sa.getAssign(info.CollName, info.PartitionName, int32(channel))
segInfo := &segInfo{
segID: info.SegID,
count: info.Count,
@ -298,7 +303,7 @@ func (sa *SegIDAssigner) syncSegments() bool {
assign = &assignInfo{
collID: info.CollectionID,
partitionID: info.PartitionID,
channelID: info.ChannelID,
channelID: int32(channel),
segInfos: segInfos,
partitionName: info.PartitionName,
collName: info.CollName,

View File

@ -249,7 +249,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"PE\020\027\022\021\n\rOUT_OF_MEMORY\020\030\022\024\n\017DD_REQUEST_RA"
"CE\020\350\007*N\n\nIndexState\022\010\n\004NONE\020\000\022\014\n\010UNISSUE"
"D\020\001\022\016\n\nINPROGRESS\020\002\022\014\n\010FINISHED\020\003\022\n\n\006FAI"
"LED\020\004*\245\004\n\007MsgType\022\t\n\005kNone\020\000\022\025\n\021kCreateC"
"LED\020\004*\274\004\n\007MsgType\022\t\n\005kNone\020\000\022\025\n\021kCreateC"
"ollection\020d\022\023\n\017kDropCollection\020e\022\022\n\016kHas"
"Collection\020f\022\027\n\023kDescribeCollection\020g\022\024\n"
"\020kShowCollections\020h\022\022\n\016kGetSysConfigs\020i\022"
@ -262,9 +262,10 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"ate\020\366\003\022\035\n\030kGetCollectionStatistics\020\367\003\022\034\n"
"\027kGetPartitionStatistics\020\370\003\022\016\n\tkTimeTick"
"\020\260\t\022\024\n\017kQueryNodeStats\020\261\t\022\017\n\nkLoadIndex\020"
"\262\t\022\017\n\nkRequestID\020\263\t\022\020\n\013kRequestTSO\020\264\tBBZ"
"@github.com/zilliztech/milvus-distribute"
"d/internal/proto/commonpbb\006proto3"
"\262\t\022\017\n\nkRequestID\020\263\t\022\020\n\013kRequestTSO\020\264\t\022\025\n"
"\020kAllocateSegment\020\265\tBBZ@github.com/zilli"
"ztech/milvus-distributed/internal/proto/"
"commonpbb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
};
@ -280,7 +281,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 1673,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 1696,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 7, 0,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 7, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
@ -378,6 +379,7 @@ bool MsgType_IsValid(int value) {
case 1202:
case 1203:
case 1204:
case 1205:
return true;
default:
return false;

View File

@ -199,12 +199,13 @@ enum MsgType : int {
kLoadIndex = 1202,
kRequestID = 1203,
kRequestTSO = 1204,
kAllocateSegment = 1205,
MsgType_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
MsgType_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool MsgType_IsValid(int value);
constexpr MsgType MsgType_MIN = kNone;
constexpr MsgType MsgType_MAX = kRequestTSO;
constexpr MsgType MsgType_MAX = kAllocateSegment;
constexpr int MsgType_ARRAYSIZE = MsgType_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* MsgType_descriptor();

View File

@ -0,0 +1,23 @@
package dataservice
type allocator interface {
allocTimestamp() (Timestamp, error)
allocID() (UniqueID, error)
}
type allocatorImpl struct {
// TODO call allocate functions in client.go in master service
}
// TODO implements
func newAllocatorImpl() *allocatorImpl {
return nil
}
func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) {
return 0, nil
}
func (allocator *allocatorImpl) allocID() (UniqueID, error) {
return 0, nil
}

View File

@ -1,5 +0,0 @@
package dataservice
type DataService struct {
segAllocator segmentAllocator
}

View File

@ -1,40 +1,63 @@
package dataservice
import (
"fmt"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"golang.org/x/net/context"
)
func (ds *DataService) RegisterNode(context.Context, *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
func (ds *Server) RegisterNode(context.Context, *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return nil, nil
}
func (ds *DataService) Flush(context.Context, *datapb.FlushRequest) (*commonpb.Status, error) {
func (ds *Server) Flush(context.Context, *datapb.FlushRequest) (*commonpb.Status, error) {
return nil, nil
}
func (ds *DataService) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) {
//for _, req := range request.SegIDRequests {
// segmentID, retCount, expireTs, err := ds.segAllocator.AllocSegment(req.CollectionID, req.PartitionID, req.ChannelID, int(req.Count))
// if err != nil {
// log.Printf()
// }
//}
return nil, nil
func (ds *Server) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) {
resp := &datapb.AssignSegIDResponse{
SegIDAssignments: make([]*datapb.SegIDAssignment, 0),
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
task := &allocateTask{
baseTask: baseTask{
sch: ds.scheduler,
meta: ds.meta,
cv: make(chan error),
},
req: request,
resp: resp,
segAllocator: ds.segAllocator,
insertCMapper: ds.insertCMapper,
}
if err := ds.scheduler.Enqueue(task); err != nil {
resp.Status.Reason = fmt.Sprintf("enqueue error: %s", err.Error())
return resp, nil
}
if err := task.WaitToFinish(ctx); err != nil {
resp.Status.Reason = fmt.Sprintf("wait to finish error: %s", err.Error())
return resp, nil
}
return resp, nil
}
func (ds *DataService) ShowSegments(context.Context, *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) {
func (ds *Server) ShowSegments(context.Context, *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) {
return nil, nil
}
func (ds *DataService) GetSegmentStates(context.Context, *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
func (ds *Server) GetSegmentStates(context.Context, *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
return nil, nil
}
func (ds *DataService) GetInsertBinlogPaths(context.Context, *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
func (ds *Server) GetInsertBinlogPaths(context.Context, *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
return nil, nil
}
func (ds *DataService) GetInsertChannels(context.Context, *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
func (ds *Server) GetInsertChannels(context.Context, *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return nil, nil
}

View File

@ -29,15 +29,17 @@ type (
collID2Info map[UniqueID]*collectionInfo // collection id to collection info
segID2Info map[UniqueID]*datapb.SegmentInfo // segment id to segment info
ddLock sync.RWMutex
allocator allocator
ddLock sync.RWMutex
}
)
func NewMetaTable(kv kv.TxnBase) (*meta, error) {
func newMetaTable(kv kv.TxnBase, allocator allocator) (*meta, error) {
mt := &meta{
client: kv,
collID2Info: make(map[UniqueID]*collectionInfo),
segID2Info: make(map[UniqueID]*datapb.SegmentInfo),
allocator: allocator,
}
err := mt.reloadFromKV()
if err != nil {
@ -46,8 +48,8 @@ func NewMetaTable(kv kv.TxnBase) (*meta, error) {
return mt, nil
}
func (mt *meta) reloadFromKV() error {
_, values, err := mt.client.LoadWithPrefix("segment")
func (meta *meta) reloadFromKV() error {
_, values, err := meta.client.LoadWithPrefix("segment")
if err != nil {
return err
}
@ -58,123 +60,145 @@ func (mt *meta) reloadFromKV() error {
if err != nil {
return err
}
mt.segID2Info[segmentInfo.SegmentID] = segmentInfo
meta.segID2Info[segmentInfo.SegmentID] = segmentInfo
}
return nil
}
func (mt *meta) AddCollection(collectionInfo *collectionInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.collID2Info[collectionInfo.ID]; ok {
func (meta *meta) AddCollection(collectionInfo *collectionInfo) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
if _, ok := meta.collID2Info[collectionInfo.ID]; ok {
return fmt.Errorf("collection %s with id %d already exist", collectionInfo.Schema.Name, collectionInfo.ID)
}
mt.collID2Info[collectionInfo.ID] = collectionInfo
meta.collID2Info[collectionInfo.ID] = collectionInfo
return nil
}
func (mt *meta) DropCollection(collID UniqueID) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
func (meta *meta) DropCollection(collID UniqueID) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
if _, ok := mt.collID2Info[collID]; !ok {
if _, ok := meta.collID2Info[collID]; !ok {
return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10))
}
delete(mt.collID2Info, collID)
for id, segment := range mt.segID2Info {
delete(meta.collID2Info, collID)
for id, segment := range meta.segID2Info {
if segment.CollectionID != collID {
continue
}
delete(mt.segID2Info, id)
if err := mt.removeSegmentInfo(id); err != nil {
delete(meta.segID2Info, id)
if err := meta.removeSegmentInfo(id); err != nil {
log.Printf("remove segment info failed, %s", err.Error())
_ = mt.reloadFromKV()
_ = meta.reloadFromKV()
}
}
return nil
}
func (mt *meta) HasCollection(collID UniqueID) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
_, ok := mt.collID2Info[collID]
func (meta *meta) HasCollection(collID UniqueID) bool {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
_, ok := meta.collID2Info[collID]
return ok
}
func (mt *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.segID2Info[segmentInfo.SegmentID]; !ok {
func (meta *meta) BuildSegment(collectionID UniqueID, partitionID UniqueID, channelRange []string) (*datapb.SegmentInfo, error) {
id, err := meta.allocator.allocID()
if err != nil {
return nil, err
}
ts, err := meta.allocator.allocTimestamp()
if err != nil {
return nil, err
}
return &datapb.SegmentInfo{
SegmentID: id,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannels: channelRange,
OpenTime: ts,
CloseTime: 0,
NumRows: 0,
MemSize: 0,
}, nil
}
func (meta *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
if _, ok := meta.segID2Info[segmentInfo.SegmentID]; !ok {
return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
}
mt.segID2Info[segmentInfo.SegmentID] = segmentInfo
if err := mt.saveSegmentInfo(segmentInfo); err != nil {
_ = mt.reloadFromKV()
meta.segID2Info[segmentInfo.SegmentID] = segmentInfo
if err := meta.saveSegmentInfo(segmentInfo); err != nil {
_ = meta.reloadFromKV()
return err
}
return nil
}
func (mt *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
func (meta *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
mt.segID2Info[segmentInfo.SegmentID] = segmentInfo
if err := mt.saveSegmentInfo(segmentInfo); err != nil {
_ = mt.reloadFromKV()
meta.segID2Info[segmentInfo.SegmentID] = segmentInfo
if err := meta.saveSegmentInfo(segmentInfo); err != nil {
_ = meta.reloadFromKV()
return err
}
return nil
}
func (mt *meta) GetSegmentByID(segID UniqueID) (*datapb.SegmentInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
func (meta *meta) GetSegment(segID UniqueID) (*datapb.SegmentInfo, error) {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
segmentInfo, ok := mt.segID2Info[segID]
segmentInfo, ok := meta.segID2Info[segID]
if !ok {
return nil, errors.Errorf("GetSegmentByID:can't find segment id = %d", segID)
}
return segmentInfo, nil
}
func (mt *meta) CloseSegment(segID UniqueID, closeTs Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
func (meta *meta) CloseSegment(segID UniqueID, closeTs Timestamp) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
segInfo, ok := mt.segID2Info[segID]
segInfo, ok := meta.segID2Info[segID]
if !ok {
return errors.Errorf("DropSegment:can't find segment id = " + strconv.FormatInt(segID, 10))
}
segInfo.CloseTime = closeTs
err := mt.saveSegmentInfo(segInfo)
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = mt.reloadFromKV()
_ = meta.reloadFromKV()
return err
}
return nil
}
func (mt *meta) GetCollection(collectionID UniqueID) (*collectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
func (meta *meta) GetCollection(collectionID UniqueID) (*collectionInfo, error) {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
collectionInfo, ok := mt.collID2Info[collectionID]
collectionInfo, ok := meta.collID2Info[collectionID]
if !ok {
return nil, fmt.Errorf("collection %d not found", collectionID)
}
return collectionInfo, nil
}
func (mt *meta) saveSegmentInfo(segmentInfo *datapb.SegmentInfo) error {
func (meta *meta) saveSegmentInfo(segmentInfo *datapb.SegmentInfo) error {
segBytes := proto.MarshalTextString(segmentInfo)
return mt.client.Save("/segment/"+strconv.FormatInt(segmentInfo.SegmentID, 10), segBytes)
return meta.client.Save("/segment/"+strconv.FormatInt(segmentInfo.SegmentID, 10), segBytes)
}
func (mt *meta) removeSegmentInfo(segID UniqueID) error {
return mt.client.Remove("/segment/" + strconv.FormatInt(segID, 10))
func (meta *meta) removeSegmentInfo(segID UniqueID) error {
return meta.client.Remove("/segment/" + strconv.FormatInt(segID, 10))
}

View File

@ -0,0 +1,89 @@
package dataservice
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/errors"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
)
//type ddRequestScheduler interface {}
//type ddReqFIFOScheduler struct {}
type ddRequestScheduler struct {
ctx context.Context
cancel context.CancelFunc
globalIDAllocator func() (UniqueID, error)
reqQueue chan task
scheduleTimeStamp Timestamp
ddMsgStream ms.MsgStream
}
func NewDDRequestScheduler(ctx context.Context) *ddRequestScheduler {
const channelSize = 1024
ctx2, cancel := context.WithCancel(ctx)
rs := ddRequestScheduler{
ctx: ctx2,
cancel: cancel,
reqQueue: make(chan task, channelSize),
}
return &rs
}
func (rs *ddRequestScheduler) Enqueue(task task) error {
rs.reqQueue <- task
return nil
}
func (rs *ddRequestScheduler) SetIDAllocator(allocGlobalID func() (UniqueID, error)) {
rs.globalIDAllocator = allocGlobalID
}
func (rs *ddRequestScheduler) SetDDMsgStream(ddStream ms.MsgStream) {
rs.ddMsgStream = ddStream
}
func (rs *ddRequestScheduler) scheduleLoop() {
for {
select {
case task := <-rs.reqQueue:
err := rs.schedule(task)
if err != nil {
log.Println(err)
}
case <-rs.ctx.Done():
log.Print("server is closed, exit task execution loop")
return
}
}
}
func (rs *ddRequestScheduler) schedule(t task) error {
timeStamp, err := t.Ts()
if err != nil {
log.Println(err)
return err
}
if timeStamp < rs.scheduleTimeStamp {
t.Notify(errors.Errorf("input timestamp = %d, schduler timestamp = %d", timeStamp, rs.scheduleTimeStamp))
} else {
rs.scheduleTimeStamp = timeStamp
err = t.Execute()
t.Notify(err)
}
return nil
}
func (rs *ddRequestScheduler) Start() error {
go rs.scheduleLoop()
return nil
}
func (rs *ddRequestScheduler) Close() {
rs.cancel()
}

View File

@ -6,8 +6,6 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
@ -17,18 +15,18 @@ type errRemainInSufficient struct {
requestRows int
}
func newErrRemainInSufficient(requestRows int) *errRemainInSufficient {
return &errRemainInSufficient{requestRows: requestRows}
func newErrRemainInSufficient(requestRows int) errRemainInSufficient {
return errRemainInSufficient{requestRows: requestRows}
}
func (err *errRemainInSufficient) Error() string {
func (err errRemainInSufficient) Error() string {
return "segment remaining is insufficient for" + strconv.Itoa(err.requestRows)
}
// segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocator interface {
// OpenSegment add the segment to allocator and set it allocatable
OpenSegment(segmentInfo *datapb.SegmentInfo) error
OpenSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, cRange channelRange) error
// AllocSegment allocate rows and record the allocation.
AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
// GetSealedSegments get all sealed segment.
@ -61,52 +59,42 @@ type (
segmentAllocatorImpl struct {
mt *meta
segments map[UniqueID]*segmentStatus //segment id -> status
cMapper *insertChannelMapper
segmentExpireDuration int64
defaultSizePerRecord int64
segmentThreshold float64
segmentThresholdFactor float64
numOfChannels int
numOfQueryNodes int
mu sync.RWMutex
globalIDAllocator func() (UniqueID, error)
globalTSOAllocator func() (Timestamp, error)
allocator allocator
}
)
func newSegmentAssigner(metaTable *meta, globalIDAllocator func() (UniqueID, error),
globalTSOAllocator func() (Timestamp, error)) (*segmentAllocatorImpl, error) {
func newSegmentAssigner(metaTable *meta, allocator allocator) (*segmentAllocatorImpl, error) {
segmentAllocator := &segmentAllocatorImpl{
mt: metaTable,
segments: make(map[UniqueID]*segmentStatus),
segmentExpireDuration: Params.SegIDAssignExpiration,
defaultSizePerRecord: Params.DefaultRecordSize,
segmentThreshold: Params.SegmentSize * 1024 * 1024,
segmentThresholdFactor: Params.SegmentSizeFactor,
numOfChannels: Params.TopicNum,
numOfQueryNodes: Params.QueryNodeNum,
globalIDAllocator: globalIDAllocator,
globalTSOAllocator: globalTSOAllocator,
allocator: allocator,
}
return segmentAllocator, nil
}
func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
func (allocator *segmentAllocatorImpl) OpenSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, cRange channelRange) error {
if _, ok := allocator.segments[segmentID]; ok {
return fmt.Errorf("segment %d already exist", segmentID)
}
totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID)
totalRows, err := allocator.estimateTotalRows(collectionID)
if err != nil {
return err
}
allocator.segments[segmentInfo.SegmentID] = &segmentStatus{
id: segmentInfo.SegmentID,
collectionID: segmentInfo.CollectionID,
partitionID: segmentInfo.PartitionID,
allocator.segments[segmentID] = &segmentStatus{
id: segmentID,
collectionID: collectionID,
partitionID: partitionID,
total: totalRows,
sealed: false,
lastExpireTime: 0,
cRange: segmentInfo.InsertChannels,
cRange: cRange,
}
return nil
}
@ -143,7 +131,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i
for _, allocation := range segStatus.allocations {
totalOfAllocations += allocation.rowNums
}
segMeta, err := allocator.mt.GetSegmentByID(segStatus.id)
segMeta, err := allocator.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
}
@ -152,7 +140,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i
return false, nil
}
ts, err := allocator.globalTSOAllocator()
ts, err := allocator.allocator.allocTimestamp()
if err != nil {
return false, err
}
@ -162,7 +150,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i
segStatus.lastExpireTime = expireTs
segStatus.allocations = append(segStatus.allocations, &allocation{
numRows,
ts,
expireTs,
})
return true, nil
@ -200,7 +188,7 @@ func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) {
}
func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
segMeta, err := allocator.mt.GetSegmentByID(segStatus.id)
segMeta, err := allocator.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
}

View File

@ -0,0 +1,8 @@
package dataservice
type Server struct {
segAllocator segmentAllocator
meta *meta
insertCMapper insertChannelMapper
scheduler *ddRequestScheduler
}

View File

@ -0,0 +1,55 @@
package dataservice
import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type statsProcessor struct {
meta *meta
segmentThreshold float64
segmentThresholdFactor float64
}
func newStatsProcessor(meta *meta) *statsProcessor {
return &statsProcessor{
meta: meta,
segmentThreshold: Params.SegmentSize * 1024 * 1024,
segmentThresholdFactor: Params.SegmentSizeFactor,
}
}
func (processor *statsProcessor) ProcessQueryNodeStats(msgPack *msgstream.MsgPack) error {
for _, msg := range msgPack.Msgs {
statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg)
if !ok {
return errors.Errorf("Type of message is not QueryNodeSegStatsMsg")
}
for _, segStat := range statsMsg.GetSegStats() {
if err := processor.processSegmentStat(segStat); err != nil {
return err
}
}
}
return nil
}
func (processor *statsProcessor) processSegmentStat(segStats *internalpb2.SegmentStats) error {
if !segStats.GetRecentlyModified() {
return nil
}
segID := segStats.GetSegmentID()
segMeta, err := processor.meta.GetSegment(segID)
if err != nil {
return err
}
segMeta.NumRows = segStats.NumRows
segMeta.MemSize = segStats.MemorySize
return processor.meta.UpdateSegment(segMeta)
}

View File

@ -0,0 +1,126 @@
package dataservice
import (
"context"
"fmt"
"log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/errors"
)
// TODO: get timestamp from timestampOracle
type task interface {
Type() commonpb.MsgType
Ts() (Timestamp, error)
Execute() error
WaitToFinish(ctx context.Context) error
Notify(err error)
}
type baseTask struct {
sch *ddRequestScheduler
meta *meta
cv chan error
}
func (bt *baseTask) Notify(err error) {
bt.cv <- err
}
func (bt *baseTask) WaitToFinish(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return errors.Errorf("context done")
case err, ok := <-bt.cv:
if !ok {
return errors.Errorf("notify chan closed")
}
return err
}
}
}
type allocateTask struct {
baseTask
req *datapb.AssignSegIDRequest
resp *datapb.AssignSegIDResponse
segAllocator segmentAllocator
insertCMapper insertChannelMapper
}
func (task *allocateTask) Type() commonpb.MsgType {
return commonpb.MsgType_kAllocateSegment
}
func (task *allocateTask) Ts() (Timestamp, error) {
return task.req.Timestamp, nil
}
func (task *allocateTask) Execute() error {
for _, req := range task.req.SegIDRequests {
result := &datapb.SegIDAssignment{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
segmentID, retCount, expireTs, err := task.segAllocator.AllocSegment(req.CollectionID, req.PartitionID, req.ChannelName, int(req.Count))
if err != nil {
if _, ok := err.(errRemainInSufficient); !ok {
result.Status.Reason = fmt.Sprintf("allocation of Collection %d, Partition %d, Channel %s, Count %d error: %s",
req.CollectionID, req.PartitionID, req.ChannelName, req.Count, err.Error())
task.resp.SegIDAssignments = append(task.resp.SegIDAssignments, result)
continue
}
log.Printf("no enough space for allocation of Collection %d, Partition %d, Channel %s, Count %d",
req.CollectionID, req.PartitionID, req.ChannelName, req.Count)
if err = task.openNewSegment(req.CollectionID, req.PartitionID, req.ChannelName); err != nil {
result.Status.Reason = fmt.Sprintf("open new segment of Collection %d, Partition %d, Channel %s, Count %d error: %s",
req.CollectionID, req.PartitionID, req.ChannelName, req.Count, err.Error())
task.resp.SegIDAssignments = append(task.resp.SegIDAssignments, result)
continue
}
segmentID, retCount, expireTs, err = task.segAllocator.AllocSegment(req.CollectionID, req.PartitionID, req.ChannelName, int(req.Count))
if err != nil {
result.Status.Reason = fmt.Sprintf("retry allocation of Collection %d, Partition %d, Channel %s, Count %d error: %s",
req.CollectionID, req.PartitionID, req.ChannelName, req.Count, err.Error())
task.resp.SegIDAssignments = append(task.resp.SegIDAssignments, result)
continue
}
}
result.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
result.CollectionID = req.CollectionID
result.SegID = segmentID
result.PartitionID = req.PartitionID
result.Count = uint32(retCount)
result.ExpireTime = expireTs
result.ChannelName = req.ChannelName
task.resp.SegIDAssignments = append(task.resp.SegIDAssignments, result)
}
return nil
}
func (task *allocateTask) openNewSegment(collectionID UniqueID, partitionID UniqueID, channelName string) error {
cRange, err := task.insertCMapper.GetChannelRange(channelName)
if err != nil {
return err
}
segmentInfo, err := task.meta.BuildSegment(collectionID, partitionID, cRange)
if err != nil {
return err
}
if err = task.meta.AddSegment(segmentInfo); err != nil {
return err
}
if err = task.segAllocator.OpenSegment(collectionID, partitionID, segmentInfo.SegmentID, segmentInfo.InsertChannels); err != nil {
return err
}
return nil
}

View File

@ -1147,7 +1147,7 @@ func TestMaster(t *testing.T) {
NodeID: 1,
PeerRole: "ProxyNode",
SegIDRequests: []*datapb.SegIDRequest{
{Count: 10000, ChannelID: "0", CollName: collName, PartitionName: partitionName},
{Count: 10000, ChannelName: "0", CollName: collName, PartitionName: partitionName},
},
})
assert.Nil(t, err)
@ -1156,7 +1156,7 @@ func TestMaster(t *testing.T) {
assert.EqualValues(t, commonpb.ErrorCode_SUCCESS, assignments[0].Status.ErrorCode)
assert.EqualValues(t, collName, assignments[0].CollName)
assert.EqualValues(t, partitionName, assignments[0].PartitionName)
assert.EqualValues(t, int32(0), assignments[0].ChannelID)
assert.EqualValues(t, "0", assignments[0].ChannelName)
assert.EqualValues(t, uint32(10000), assignments[0].Count)
// test stats

View File

@ -77,7 +77,7 @@ func (manager *SegmentManagerImpl) AssignSegment(segIDReq []*datapb.SegIDRequest
collName := req.CollName
paritionName := req.PartitionName
count := req.Count
channelID := req.ChannelID
channelID := req.ChannelName
collMeta, err := manager.metaTable.GetCollectionByName(collName)
if err != nil {
@ -141,7 +141,7 @@ func (manager *SegmentManagerImpl) assignSegment(
return &datapb.SegIDAssignment{
SegID: segStatus.segmentID,
ChannelID: channelID,
ChannelName: strconv.Itoa(int(channelID)),
Count: count,
CollName: collName,
PartitionName: paritionName,
@ -176,7 +176,7 @@ func (manager *SegmentManagerImpl) assignSegment(
}
return &datapb.SegIDAssignment{
SegID: id,
ChannelID: channelID,
ChannelName: strconv.Itoa(int(channelID)),
Count: count,
CollName: collName,
PartitionName: paritionName,

View File

@ -112,8 +112,8 @@ func TestSegmentManager_AssignSegment(t *testing.T) {
for _, c := range cases {
result, _ := segManager.AssignSegment([]*datapb.SegIDRequest{
{Count: c.Count,
ChannelID: strconv.FormatInt(int64(c.ChannelID), 10),
CollName: collName, PartitionName: partitionName},
ChannelName: strconv.FormatInt(int64(c.ChannelID), 10),
CollName: collName, PartitionName: partitionName},
})
results = append(results, result...)
if c.Err {
@ -239,9 +239,9 @@ func TestSegmentManager_SycnWritenode(t *testing.T) {
maxCount := uint32(Params.SegmentSize * 1024 * 1024 / float64(sizePerRecord))
req := []*datapb.SegIDRequest{
{Count: maxCount, ChannelID: "1", CollName: collName, PartitionName: partitionName},
{Count: maxCount, ChannelID: "2", CollName: collName, PartitionName: partitionName},
{Count: maxCount, ChannelID: "3", CollName: collName, PartitionName: partitionName},
{Count: maxCount, ChannelName: "1", CollName: collName, PartitionName: partitionName},
{Count: maxCount, ChannelName: "2", CollName: collName, PartitionName: partitionName},
{Count: maxCount, ChannelName: "3", CollName: collName, PartitionName: partitionName},
}
assignSegment, err := segManager.AssignSegment(req)
assert.Nil(t, err)
@ -249,7 +249,7 @@ func TestSegmentManager_SycnWritenode(t *testing.T) {
assert.Nil(t, err)
for i := 0; i < len(assignSegment); i++ {
assert.EqualValues(t, maxCount, assignSegment[i].Count)
assert.EqualValues(t, i+1, assignSegment[i].ChannelID)
assert.EqualValues(t, strconv.Itoa(i+1), assignSegment[i].ChannelName)
err = mt.UpdateSegment(&pb.SegmentMeta{
SegmentID: assignSegment[i].SegID,

View File

@ -105,7 +105,7 @@ enum MsgType {
kLoadIndex = 1202;
kRequestID = 1203;
kRequestTSO = 1204;
kAllocateSegment = 1205;
}
message MsgBase {

View File

@ -180,11 +180,12 @@ const (
MsgType_kGetCollectionStatistics MsgType = 503
MsgType_kGetPartitionStatistics MsgType = 504
// System Control
MsgType_kTimeTick MsgType = 1200
MsgType_kQueryNodeStats MsgType = 1201
MsgType_kLoadIndex MsgType = 1202
MsgType_kRequestID MsgType = 1203
MsgType_kRequestTSO MsgType = 1204
MsgType_kTimeTick MsgType = 1200
MsgType_kQueryNodeStats MsgType = 1201
MsgType_kLoadIndex MsgType = 1202
MsgType_kRequestID MsgType = 1203
MsgType_kRequestTSO MsgType = 1204
MsgType_kAllocateSegment MsgType = 1205
)
var MsgType_name = map[int32]string{
@ -215,6 +216,7 @@ var MsgType_name = map[int32]string{
1202: "kLoadIndex",
1203: "kRequestID",
1204: "kRequestTSO",
1205: "kAllocateSegment",
}
var MsgType_value = map[string]int32{
@ -245,6 +247,7 @@ var MsgType_value = map[string]int32{
"kLoadIndex": 1202,
"kRequestID": 1203,
"kRequestTSO": 1204,
"kAllocateSegment": 1205,
}
func (x MsgType) String() string {
@ -585,72 +588,73 @@ func init() {
func init() { proto.RegisterFile("common.proto", fileDescriptor_555bd8c177793206) }
var fileDescriptor_555bd8c177793206 = []byte{
// 1062 bytes of a gzipped FileDescriptorProto
// 1077 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0xcb, 0x6e, 0xe3, 0x36,
0x17, 0x1e, 0x5f, 0x12, 0x47, 0xc7, 0x8e, 0xc3, 0x30, 0x37, 0xff, 0x7f, 0xd3, 0x22, 0xf0, 0x2a,
0x08, 0x30, 0x49, 0xd1, 0x02, 0xed, 0x6a, 0x80, 0x3a, 0x12, 0x9d, 0x10, 0x23, 0x4b, 0x1e, 0x4a,
0x9e, 0x66, 0xba, 0x11, 0x64, 0x8b, 0x63, 0x0b, 0x92, 0x2d, 0x57, 0xa4, 0xa7, 0xf5, 0x3c, 0x45,
0x3b, 0xef, 0xd0, 0x5d, 0x0b, 0xf4, 0xb6, 0xe8, 0x23, 0xf4, 0xf6, 0x20, 0x7d, 0x80, 0xde, 0x96,
0x05, 0x25, 0x2b, 0x36, 0x8a, 0xe9, 0x8e, 0xe7, 0xfb, 0x78, 0x3e, 0x9e, 0xef, 0x90, 0x87, 0xd0,
0x18, 0x25, 0xd3, 0x69, 0x32, 0xbb, 0x9c, 0xa7, 0x89, 0x4c, 0xf0, 0xc1, 0x34, 0x8c, 0x5f, 0x2c,
0x44, 0x1e, 0x5d, 0xe6, 0x54, 0xbb, 0x06, 0x5b, 0x64, 0x3a, 0x97, 0xcb, 0xb6, 0x07, 0xdb, 0x8e,
0xf4, 0xe5, 0x42, 0xe0, 0x47, 0x00, 0x3c, 0x4d, 0x93, 0xd4, 0x1b, 0x25, 0x01, 0x6f, 0x95, 0xce,
0x4a, 0xe7, 0xcd, 0x77, 0xde, 0xba, 0x7c, 0x4d, 0xf2, 0x25, 0x51, 0xdb, 0xf4, 0x24, 0xe0, 0x4c,
0xe3, 0xc5, 0x12, 0x1f, 0xc3, 0x76, 0xca, 0x7d, 0x91, 0xcc, 0x5a, 0xe5, 0xb3, 0xd2, 0xb9, 0xc6,
0x56, 0x51, 0xfb, 0x3d, 0x68, 0x3c, 0xe6, 0xcb, 0xa7, 0x7e, 0xbc, 0xe0, 0x7d, 0x3f, 0x4c, 0x31,
0x82, 0x4a, 0xc4, 0x97, 0x99, 0xbe, 0xc6, 0xd4, 0x12, 0x1f, 0xc2, 0xd6, 0x0b, 0x45, 0xaf, 0x12,
0xf3, 0xa0, 0x7d, 0x0a, 0xd5, 0xeb, 0x38, 0x19, 0xae, 0x59, 0x95, 0xd1, 0x28, 0xd8, 0x87, 0x50,
0xeb, 0x04, 0x41, 0xca, 0x85, 0xc0, 0x4d, 0x28, 0x87, 0xf3, 0x95, 0x5e, 0x39, 0x9c, 0x63, 0x0c,
0xd5, 0x79, 0x92, 0xca, 0x4c, 0xad, 0xc2, 0xb2, 0x75, 0xfb, 0x55, 0x09, 0x6a, 0x3d, 0x31, 0xbe,
0xf6, 0x05, 0xc7, 0xef, 0xc3, 0xce, 0x54, 0x8c, 0x3d, 0xb9, 0x9c, 0x17, 0x2e, 0x4f, 0x5f, 0xeb,
0xb2, 0x27, 0xc6, 0xee, 0x72, 0xce, 0x59, 0x6d, 0x9a, 0x2f, 0x54, 0x25, 0x53, 0x31, 0xa6, 0xc6,
0x4a, 0x39, 0x0f, 0xf0, 0x29, 0x68, 0x32, 0x9c, 0x72, 0x21, 0xfd, 0xe9, 0xbc, 0x55, 0x39, 0x2b,
0x9d, 0x57, 0xd9, 0x1a, 0xc0, 0xff, 0x87, 0x1d, 0x91, 0x2c, 0xd2, 0x11, 0xa7, 0x46, 0xab, 0x9a,
0xa5, 0xdd, 0xc7, 0xed, 0x47, 0xa0, 0xf5, 0xc4, 0xf8, 0x96, 0xfb, 0x01, 0x4f, 0xf1, 0xdb, 0x50,
0x1d, 0xfa, 0x22, 0xaf, 0xa8, 0xfe, 0xdf, 0x15, 0x29, 0x07, 0x2c, 0xdb, 0x79, 0xf1, 0x43, 0x15,
0xb4, 0xfb, 0x9b, 0xc0, 0x75, 0xa8, 0x39, 0x03, 0x5d, 0x27, 0x8e, 0x83, 0x1e, 0xe0, 0x43, 0x40,
0x03, 0x8b, 0xdc, 0xf5, 0x89, 0xee, 0x12, 0xc3, 0x23, 0x8c, 0xd9, 0x0c, 0x95, 0x30, 0x86, 0xa6,
0x6e, 0x5b, 0x16, 0xd1, 0x5d, 0xaf, 0xdb, 0xa1, 0x26, 0x31, 0x50, 0x19, 0x1f, 0xc1, 0x7e, 0x9f,
0xb0, 0x1e, 0x75, 0x1c, 0x6a, 0x5b, 0x9e, 0x41, 0x2c, 0x4a, 0x0c, 0x54, 0xc1, 0xff, 0x83, 0x23,
0xdd, 0x36, 0x4d, 0xa2, 0xbb, 0x0a, 0xb6, 0x6c, 0xd7, 0x23, 0x77, 0xd4, 0x71, 0x1d, 0x54, 0x55,
0xda, 0xd4, 0x34, 0xc9, 0x4d, 0xc7, 0xf4, 0x3a, 0xec, 0x66, 0xd0, 0x23, 0x96, 0x8b, 0xb6, 0x94,
0x4e, 0x81, 0x1a, 0xb4, 0x47, 0x2c, 0x25, 0x87, 0x6a, 0xf8, 0x18, 0x70, 0x01, 0x53, 0xcb, 0x20,
0x77, 0x9e, 0xfb, 0xac, 0x4f, 0xd0, 0x0e, 0x7e, 0x03, 0x4e, 0x0a, 0x7c, 0xf3, 0x9c, 0x4e, 0x8f,
0x20, 0x0d, 0x23, 0x68, 0x14, 0xa4, 0x6b, 0xf7, 0x1f, 0x23, 0xd8, 0x54, 0x67, 0xf6, 0x87, 0x8c,
0xe8, 0x36, 0x33, 0x50, 0x7d, 0x13, 0x7e, 0x4a, 0x74, 0xd7, 0x66, 0x1e, 0x35, 0x50, 0x43, 0x15,
0x5f, 0xc0, 0x0e, 0xe9, 0x30, 0xfd, 0xd6, 0x63, 0xc4, 0x19, 0x98, 0x2e, 0xda, 0x55, 0x2d, 0xe8,
0x52, 0x93, 0x64, 0x8e, 0xba, 0xf6, 0xc0, 0x32, 0x50, 0x13, 0xef, 0x41, 0xbd, 0x47, 0xdc, 0x4e,
0xd1, 0x93, 0x3d, 0x75, 0xbe, 0xde, 0xd1, 0x6f, 0x49, 0x81, 0x20, 0xdc, 0x82, 0x43, 0xbd, 0x63,
0xa9, 0x24, 0x9d, 0x91, 0x8e, 0x4b, 0xbc, 0xae, 0x6d, 0x1a, 0x84, 0xa1, 0x7d, 0x65, 0xf0, 0x5f,
0x0c, 0x35, 0x09, 0xc2, 0x1b, 0x19, 0x06, 0x31, 0xc9, 0x3a, 0xe3, 0x60, 0x23, 0xa3, 0x60, 0x54,
0xc6, 0xa1, 0x32, 0x73, 0x3d, 0xa0, 0xa6, 0xb1, 0x6a, 0x54, 0x7e, 0x69, 0x47, 0x78, 0x1f, 0x76,
0x0b, 0x33, 0x96, 0x49, 0x1d, 0x17, 0x1d, 0xe3, 0x13, 0x38, 0x28, 0xa0, 0x1e, 0x71, 0x19, 0xd5,
0xf3, 0xae, 0x9e, 0xa8, 0xbd, 0xf6, 0xc0, 0xf5, 0xec, 0xae, 0xd7, 0x23, 0x3d, 0x9b, 0x3d, 0x43,
0x2d, 0x7c, 0x08, 0x7b, 0x86, 0xe1, 0x31, 0xf2, 0x64, 0x40, 0x1c, 0xd7, 0x63, 0x1d, 0x9d, 0xa0,
0xdf, 0x6a, 0x17, 0x16, 0x00, 0x9d, 0x05, 0xfc, 0x53, 0x35, 0xf9, 0x1c, 0xef, 0x40, 0xd5, 0xb2,
0x2d, 0x82, 0x1e, 0xe0, 0x06, 0xec, 0x0c, 0x2c, 0xea, 0x38, 0x03, 0x62, 0xa0, 0x12, 0x6e, 0x02,
0x50, 0xab, 0xcf, 0xec, 0x1b, 0xa6, 0x5e, 0x55, 0x59, 0xb1, 0x5d, 0x6a, 0x51, 0xe7, 0x36, 0x7b,
0x22, 0x00, 0xdb, 0xab, 0xfe, 0x54, 0x2f, 0xbe, 0xa8, 0x66, 0xe3, 0x95, 0x4d, 0x89, 0x06, 0x5b,
0x91, 0x95, 0xcc, 0x38, 0x7a, 0xa0, 0x2c, 0x45, 0x7a, 0xca, 0x7d, 0xc9, 0xf5, 0x24, 0x8e, 0xf9,
0x48, 0x86, 0xc9, 0x0c, 0x05, 0xf8, 0x00, 0xf6, 0x22, 0x23, 0x4d, 0xe6, 0x1b, 0x20, 0x57, 0x37,
0x13, 0xdd, 0xfa, 0x62, 0x03, 0x7b, 0xae, 0x8c, 0x46, 0x06, 0x17, 0xa3, 0x34, 0x1c, 0x6e, 0x2a,
0x8c, 0xd5, 0x1b, 0x8c, 0x9c, 0x49, 0xf2, 0xc9, 0x1a, 0x14, 0x68, 0x92, 0x49, 0xdc, 0x70, 0xe9,
0x2c, 0x85, 0x9e, 0xcc, 0x9e, 0x87, 0x63, 0x81, 0x42, 0x7c, 0x04, 0x68, 0x55, 0x42, 0xdf, 0x4f,
0x65, 0x98, 0xe5, 0xff, 0x58, 0xc2, 0x07, 0xd0, 0xcc, 0x4a, 0x58, 0x83, 0x3f, 0xa9, 0xf9, 0xd8,
0x55, 0x25, 0xac, 0xb1, 0x9f, 0x4b, 0xf8, 0x04, 0xf0, 0x7d, 0x09, 0x6b, 0xe2, 0x97, 0x92, 0x6a,
0x6c, 0x56, 0xc2, 0x3d, 0x28, 0xd0, 0xaf, 0x25, 0xbc, 0x0f, 0x8d, 0xd5, 0x71, 0x59, 0x7f, 0xd1,
0x97, 0xe5, 0xfc, 0xa8, 0x95, 0x42, 0x0e, 0x7e, 0xa5, 0x5a, 0x59, 0x8b, 0xe8, 0x4c, 0xf0, 0x54,
0xa2, 0xcf, 0x2a, 0x59, 0x64, 0xf0, 0x98, 0x4b, 0x8e, 0x3e, 0xaf, 0xe0, 0x3a, 0x6c, 0x47, 0xdd,
0x78, 0x21, 0x26, 0xe8, 0x55, 0x4e, 0x39, 0xdc, 0x4f, 0x47, 0x13, 0xf4, 0x7b, 0x25, 0xab, 0x30,
0x8f, 0x18, 0x17, 0x8b, 0x58, 0xa2, 0x3f, 0x2a, 0x99, 0xfe, 0x0d, 0x97, 0xeb, 0xfb, 0x44, 0x7f,
0x56, 0xf0, 0x9b, 0xd0, 0x52, 0xe0, 0xba, 0x3f, 0x8a, 0x09, 0x85, 0x0c, 0x47, 0x02, 0xfd, 0x55,
0xc1, 0xa7, 0x70, 0xa2, 0xe8, 0xfb, 0xda, 0x37, 0xd8, 0xbf, 0x2b, 0xb8, 0x09, 0x5a, 0xe4, 0x86,
0x53, 0xee, 0x86, 0xa3, 0x08, 0x7d, 0xad, 0x65, 0x56, 0x9f, 0x2c, 0x78, 0xba, 0xb4, 0x92, 0x80,
0xab, 0xad, 0x02, 0x7d, 0xa3, 0xe1, 0x3d, 0x80, 0xc8, 0x4c, 0xfc, 0x20, 0xf7, 0xf4, 0x6d, 0x0e,
0x30, 0xfe, 0xf1, 0x82, 0x0b, 0x49, 0x0d, 0xf4, 0x9d, 0x9a, 0xe3, 0x7a, 0x01, 0xb8, 0x8e, 0x8d,
0xbe, 0xd7, 0xae, 0xaf, 0x3f, 0xfa, 0x60, 0x1c, 0xca, 0xc9, 0x62, 0xa8, 0x7e, 0xb4, 0xab, 0x97,
0x61, 0x1c, 0x87, 0x2f, 0x25, 0x1f, 0x4d, 0xae, 0xf2, 0xdf, 0xee, 0x61, 0x10, 0x0a, 0x99, 0x86,
0xc3, 0x85, 0xe4, 0xc1, 0x55, 0x38, 0x93, 0x3c, 0x9d, 0xf9, 0xf1, 0x55, 0xf6, 0x05, 0x5e, 0xe5,
0x5f, 0xe0, 0x7c, 0x38, 0xdc, 0xce, 0xe2, 0x77, 0xff, 0x09, 0x00, 0x00, 0xff, 0xff, 0x31, 0x9c,
0x64, 0xd6, 0xe5, 0x06, 0x00, 0x00,
0x14, 0x1d, 0x3f, 0x12, 0x47, 0xd7, 0x1e, 0x87, 0x61, 0x5e, 0x6e, 0x9b, 0x16, 0x81, 0x57, 0x41,
0x80, 0x49, 0x8a, 0x16, 0x68, 0x57, 0x03, 0x54, 0x91, 0xe8, 0x84, 0x18, 0x59, 0xf2, 0x50, 0xf2,
0x34, 0xd3, 0x8d, 0x20, 0xdb, 0x1c, 0x5b, 0x90, 0x6c, 0xb9, 0x22, 0x3d, 0xad, 0xe7, 0x2b, 0xda,
0xf9, 0x8e, 0x16, 0xe8, 0x13, 0xe8, 0xa2, 0x1f, 0xd0, 0xd7, 0x87, 0xf4, 0x03, 0xfa, 0x5a, 0x16,
0x94, 0xac, 0xd8, 0x28, 0xa6, 0x3b, 0xde, 0x73, 0x78, 0x0f, 0xef, 0xb9, 0xe4, 0x25, 0x34, 0x86,
0xc9, 0x74, 0x9a, 0xcc, 0x2e, 0xe6, 0x69, 0x22, 0x13, 0xbc, 0x3f, 0x0d, 0xe3, 0xe7, 0x0b, 0x91,
0x47, 0x17, 0x39, 0xd5, 0xae, 0xc1, 0x16, 0x99, 0xce, 0xe5, 0xb2, 0xed, 0xc3, 0xb6, 0x2b, 0x03,
0xb9, 0x10, 0xf8, 0x21, 0x00, 0x4f, 0xd3, 0x24, 0xf5, 0x87, 0xc9, 0x88, 0xb7, 0x4a, 0xa7, 0xa5,
0xb3, 0xe6, 0x3b, 0x6f, 0x5d, 0xbc, 0x22, 0xf9, 0x82, 0xa8, 0x6d, 0x46, 0x32, 0xe2, 0x4c, 0xe3,
0xc5, 0x12, 0x1f, 0xc1, 0x76, 0xca, 0x03, 0x91, 0xcc, 0x5a, 0xe5, 0xd3, 0xd2, 0x99, 0xc6, 0x56,
0x51, 0xfb, 0x3d, 0x68, 0x3c, 0xe2, 0xcb, 0x27, 0x41, 0xbc, 0xe0, 0xbd, 0x20, 0x4c, 0x31, 0x82,
0x4a, 0xc4, 0x97, 0x99, 0xbe, 0xc6, 0xd4, 0x12, 0x1f, 0xc0, 0xd6, 0x73, 0x45, 0xaf, 0x12, 0xf3,
0xa0, 0x7d, 0x02, 0xd5, 0xab, 0x38, 0x19, 0xac, 0x59, 0x95, 0xd1, 0x28, 0xd8, 0x07, 0x50, 0xd3,
0x47, 0xa3, 0x94, 0x0b, 0x81, 0x9b, 0x50, 0x0e, 0xe7, 0x2b, 0xbd, 0x72, 0x38, 0xc7, 0x18, 0xaa,
0xf3, 0x24, 0x95, 0x99, 0x5a, 0x85, 0x65, 0xeb, 0xf6, 0xcb, 0x12, 0xd4, 0xba, 0x62, 0x7c, 0x15,
0x08, 0x8e, 0xdf, 0x87, 0x9d, 0xa9, 0x18, 0xfb, 0x72, 0x39, 0x2f, 0x5c, 0x9e, 0xbc, 0xd2, 0x65,
0x57, 0x8c, 0xbd, 0xe5, 0x9c, 0xb3, 0xda, 0x34, 0x5f, 0xa8, 0x4a, 0xa6, 0x62, 0x4c, 0xcd, 0x95,
0x72, 0x1e, 0xe0, 0x13, 0xd0, 0x64, 0x38, 0xe5, 0x42, 0x06, 0xd3, 0x79, 0xab, 0x72, 0x5a, 0x3a,
0xab, 0xb2, 0x35, 0x80, 0x5f, 0x87, 0x1d, 0x91, 0x2c, 0xd2, 0x21, 0xa7, 0x66, 0xab, 0x9a, 0xa5,
0xdd, 0xc5, 0xed, 0x87, 0xa0, 0x75, 0xc5, 0xf8, 0x86, 0x07, 0x23, 0x9e, 0xe2, 0xb7, 0xa1, 0x3a,
0x08, 0x44, 0x5e, 0x51, 0xfd, 0xff, 0x2b, 0x52, 0x0e, 0x58, 0xb6, 0xf3, 0xfc, 0x87, 0x2a, 0x68,
0x77, 0x37, 0x81, 0xeb, 0x50, 0x73, 0xfb, 0x86, 0x41, 0x5c, 0x17, 0xdd, 0xc3, 0x07, 0x80, 0xfa,
0x36, 0xb9, 0xed, 0x11, 0xc3, 0x23, 0xa6, 0x4f, 0x18, 0x73, 0x18, 0x2a, 0x61, 0x0c, 0x4d, 0xc3,
0xb1, 0x6d, 0x62, 0x78, 0x7e, 0x47, 0xa7, 0x16, 0x31, 0x51, 0x19, 0x1f, 0xc2, 0x5e, 0x8f, 0xb0,
0x2e, 0x75, 0x5d, 0xea, 0xd8, 0xbe, 0x49, 0x6c, 0x4a, 0x4c, 0x54, 0xc1, 0xaf, 0xc1, 0xa1, 0xe1,
0x58, 0x16, 0x31, 0x3c, 0x05, 0xdb, 0x8e, 0xe7, 0x93, 0x5b, 0xea, 0x7a, 0x2e, 0xaa, 0x2a, 0x6d,
0x6a, 0x59, 0xe4, 0x5a, 0xb7, 0x7c, 0x9d, 0x5d, 0xf7, 0xbb, 0xc4, 0xf6, 0xd0, 0x96, 0xd2, 0x29,
0x50, 0x93, 0x76, 0x89, 0xad, 0xe4, 0x50, 0x0d, 0x1f, 0x01, 0x2e, 0x60, 0x6a, 0x9b, 0xe4, 0xd6,
0xf7, 0x9e, 0xf6, 0x08, 0xda, 0xc1, 0x6f, 0xc0, 0x71, 0x81, 0x6f, 0x9e, 0xa3, 0x77, 0x09, 0xd2,
0x30, 0x82, 0x46, 0x41, 0x7a, 0x4e, 0xef, 0x11, 0x82, 0x4d, 0x75, 0xe6, 0x7c, 0xc8, 0x88, 0xe1,
0x30, 0x13, 0xd5, 0x37, 0xe1, 0x27, 0xc4, 0xf0, 0x1c, 0xe6, 0x53, 0x13, 0x35, 0x54, 0xf1, 0x05,
0xec, 0x12, 0x9d, 0x19, 0x37, 0x3e, 0x23, 0x6e, 0xdf, 0xf2, 0xd0, 0x7d, 0xd5, 0x82, 0x0e, 0xb5,
0x48, 0xe6, 0xa8, 0xe3, 0xf4, 0x6d, 0x13, 0x35, 0xf1, 0x2e, 0xd4, 0xbb, 0xc4, 0xd3, 0x8b, 0x9e,
0xec, 0xaa, 0xf3, 0x0d, 0xdd, 0xb8, 0x21, 0x05, 0x82, 0x70, 0x0b, 0x0e, 0x0c, 0xdd, 0x56, 0x49,
0x06, 0x23, 0xba, 0x47, 0xfc, 0x8e, 0x63, 0x99, 0x84, 0xa1, 0x3d, 0x65, 0xf0, 0x3f, 0x0c, 0xb5,
0x08, 0xc2, 0x1b, 0x19, 0x26, 0xb1, 0xc8, 0x3a, 0x63, 0x7f, 0x23, 0xa3, 0x60, 0x54, 0xc6, 0x81,
0x32, 0x73, 0xd5, 0xa7, 0x96, 0xb9, 0x6a, 0x54, 0x7e, 0x69, 0x87, 0x78, 0x0f, 0xee, 0x17, 0x66,
0x6c, 0x8b, 0xba, 0x1e, 0x3a, 0xc2, 0xc7, 0xb0, 0x5f, 0x40, 0x5d, 0xe2, 0x31, 0x6a, 0xe4, 0x5d,
0x3d, 0x56, 0x7b, 0x9d, 0xbe, 0xe7, 0x3b, 0x1d, 0xbf, 0x4b, 0xba, 0x0e, 0x7b, 0x8a, 0x5a, 0xf8,
0x00, 0x76, 0x4d, 0xd3, 0x67, 0xe4, 0x71, 0x9f, 0xb8, 0x9e, 0xcf, 0x74, 0x83, 0xa0, 0xdf, 0x6b,
0xe7, 0x36, 0x00, 0x9d, 0x8d, 0xf8, 0xa7, 0x6a, 0xf2, 0x39, 0xde, 0x81, 0xaa, 0xed, 0xd8, 0x04,
0xdd, 0xc3, 0x0d, 0xd8, 0xe9, 0xdb, 0xd4, 0x75, 0xfb, 0xc4, 0x44, 0x25, 0xdc, 0x04, 0xa0, 0x76,
0x8f, 0x39, 0xd7, 0x4c, 0xbd, 0xaa, 0xb2, 0x62, 0x3b, 0xd4, 0xa6, 0xee, 0x4d, 0xf6, 0x44, 0x00,
0xb6, 0x57, 0xfd, 0xa9, 0x9e, 0xff, 0x58, 0xcd, 0xc6, 0x2b, 0x9b, 0x12, 0x0d, 0xb6, 0x22, 0x3b,
0x99, 0x71, 0x74, 0x4f, 0x59, 0x8a, 0x8c, 0x94, 0x07, 0x92, 0x1b, 0x49, 0x1c, 0xf3, 0xa1, 0x0c,
0x93, 0x19, 0x1a, 0xe1, 0x7d, 0xd8, 0x8d, 0xcc, 0x34, 0x99, 0x6f, 0x80, 0x5c, 0xdd, 0x4c, 0x74,
0x13, 0x88, 0x0d, 0xec, 0x99, 0x32, 0x1a, 0x99, 0x5c, 0x0c, 0xd3, 0x70, 0xb0, 0xa9, 0x30, 0x56,
0x6f, 0x30, 0x72, 0x27, 0xc9, 0x27, 0x6b, 0x50, 0xa0, 0x49, 0x26, 0x71, 0xcd, 0xa5, 0xbb, 0x14,
0x46, 0x32, 0x7b, 0x16, 0x8e, 0x05, 0x0a, 0xf1, 0x21, 0xa0, 0x55, 0x09, 0xbd, 0x20, 0x95, 0x61,
0x96, 0xff, 0x53, 0x09, 0xef, 0x43, 0x33, 0x2b, 0x61, 0x0d, 0xfe, 0xac, 0xe6, 0xe3, 0xbe, 0x2a,
0x61, 0x8d, 0xfd, 0x52, 0xc2, 0xc7, 0x80, 0xef, 0x4a, 0x58, 0x13, 0xbf, 0x96, 0x54, 0x63, 0xb3,
0x12, 0xee, 0x40, 0x81, 0x7e, 0x2b, 0xe1, 0x3d, 0x68, 0xac, 0x8e, 0xcb, 0xfa, 0x8b, 0xbe, 0x28,
0xe7, 0x47, 0xad, 0x14, 0x72, 0xf0, 0x4b, 0xd5, 0xca, 0x5a, 0x44, 0x67, 0x82, 0xa7, 0x12, 0x7d,
0x56, 0xc9, 0x22, 0x93, 0xc7, 0x5c, 0x72, 0xf4, 0x79, 0x05, 0xd7, 0x61, 0x3b, 0xea, 0xc4, 0x0b,
0x31, 0x41, 0x2f, 0x73, 0xca, 0xe5, 0x41, 0x3a, 0x9c, 0xa0, 0x3f, 0x2a, 0x59, 0x85, 0x79, 0xc4,
0xb8, 0x58, 0xc4, 0x12, 0xfd, 0x59, 0xc9, 0xf4, 0xaf, 0xb9, 0x5c, 0xdf, 0x27, 0xfa, 0xab, 0x82,
0xdf, 0x84, 0x96, 0x02, 0xd7, 0xfd, 0x51, 0x4c, 0x28, 0x64, 0x38, 0x14, 0xe8, 0xef, 0x0a, 0x3e,
0x81, 0x63, 0x45, 0xdf, 0xd5, 0xbe, 0xc1, 0xfe, 0x53, 0xc1, 0x4d, 0xd0, 0x22, 0x2f, 0x9c, 0x72,
0x2f, 0x1c, 0x46, 0xe8, 0x2b, 0x2d, 0xb3, 0xfa, 0x78, 0xc1, 0xd3, 0xa5, 0x9d, 0x8c, 0xb8, 0xda,
0x2a, 0xd0, 0xd7, 0x1a, 0xde, 0x05, 0x88, 0xac, 0x24, 0x18, 0xe5, 0x9e, 0xbe, 0xc9, 0x01, 0xc6,
0x3f, 0x5e, 0x70, 0x21, 0xa9, 0x89, 0xbe, 0x55, 0x73, 0x5c, 0x2f, 0x00, 0xcf, 0x75, 0xd0, 0x77,
0x5a, 0x76, 0x1b, 0x7a, 0x1c, 0x27, 0xc3, 0x40, 0x72, 0x97, 0x8f, 0xa7, 0x7c, 0x26, 0xd1, 0xf7,
0xda, 0xd5, 0xd5, 0x47, 0x1f, 0x8c, 0x43, 0x39, 0x59, 0x0c, 0xd4, 0x47, 0x77, 0xf9, 0x22, 0x8c,
0xe3, 0xf0, 0x85, 0xe4, 0xc3, 0xc9, 0x65, 0xfe, 0x09, 0x3e, 0x18, 0x85, 0x42, 0xa6, 0xe1, 0x60,
0x21, 0xf9, 0xe8, 0x32, 0x9c, 0x49, 0x9e, 0xce, 0x82, 0xf8, 0x32, 0xfb, 0x19, 0x2f, 0xf3, 0x9f,
0x71, 0x3e, 0x18, 0x6c, 0x67, 0xf1, 0xbb, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0x8e, 0xe1, 0x76,
0x9b, 0xfc, 0x06, 0x00, 0x00,
}

View File

@ -20,7 +20,7 @@ message RegisterNodeResponse {
message SegIDRequest {
uint32 count = 1;
string channelID = 2;
string channelName = 2;
int64 collectionID = 3;
int64 partitionID = 4;
string coll_name = 5;// todo remove
@ -30,13 +30,14 @@ message SegIDRequest {
message AssignSegIDRequest {
int64 nodeID = 1;
string peer_role = 2;
repeated SegIDRequest segIDRequests = 3;
uint64 timestamp = 3;
repeated SegIDRequest segIDRequests = 4;
}
message SegIDAssignment {
int64 segID = 1;
int32 channelID = 2;
string channelName = 2;
uint32 count = 3;
int64 collectionID = 4;
int64 partitionID = 5;
@ -48,6 +49,7 @@ message SegIDAssignment {
message AssignSegIDResponse {
repeated SegIDAssignment segIDAssignments = 1;
common.Status status = 2;
}
message FlushRequest {
@ -107,7 +109,7 @@ message InsertChannelRequest {
message WatchDmChannelRequest {
common.MsgBase base = 1;
repeated string channelIDs = 2;
repeated string channelNames = 2;
}

View File

@ -145,7 +145,7 @@ func (m *RegisterNodeResponse) GetInitParams() *internalpb2.InitParams {
type SegIDRequest struct {
Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"`
ChannelID string `protobuf:"bytes,2,opt,name=channelID,proto3" json:"channelID,omitempty"`
ChannelName string `protobuf:"bytes,2,opt,name=channelName,proto3" json:"channelName,omitempty"`
CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
CollName string `protobuf:"bytes,5,opt,name=coll_name,json=collName,proto3" json:"coll_name,omitempty"`
@ -187,9 +187,9 @@ func (m *SegIDRequest) GetCount() uint32 {
return 0
}
func (m *SegIDRequest) GetChannelID() string {
func (m *SegIDRequest) GetChannelName() string {
if m != nil {
return m.ChannelID
return m.ChannelName
}
return ""
}
@ -225,7 +225,8 @@ func (m *SegIDRequest) GetPartitionName() string {
type AssignSegIDRequest struct {
NodeID int64 `protobuf:"varint,1,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
PeerRole string `protobuf:"bytes,2,opt,name=peer_role,json=peerRole,proto3" json:"peer_role,omitempty"`
SegIDRequests []*SegIDRequest `protobuf:"bytes,3,rep,name=segIDRequests,proto3" json:"segIDRequests,omitempty"`
Timestamp uint64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
SegIDRequests []*SegIDRequest `protobuf:"bytes,4,rep,name=segIDRequests,proto3" json:"segIDRequests,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -270,6 +271,13 @@ func (m *AssignSegIDRequest) GetPeerRole() string {
return ""
}
func (m *AssignSegIDRequest) GetTimestamp() uint64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *AssignSegIDRequest) GetSegIDRequests() []*SegIDRequest {
if m != nil {
return m.SegIDRequests
@ -279,7 +287,7 @@ func (m *AssignSegIDRequest) GetSegIDRequests() []*SegIDRequest {
type SegIDAssignment struct {
SegID int64 `protobuf:"varint,1,opt,name=segID,proto3" json:"segID,omitempty"`
ChannelID int32 `protobuf:"varint,2,opt,name=channelID,proto3" json:"channelID,omitempty"`
ChannelName string `protobuf:"bytes,2,opt,name=channelName,proto3" json:"channelName,omitempty"`
Count uint32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"`
CollectionID int64 `protobuf:"varint,4,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,5,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
@ -324,11 +332,11 @@ func (m *SegIDAssignment) GetSegID() int64 {
return 0
}
func (m *SegIDAssignment) GetChannelID() int32 {
func (m *SegIDAssignment) GetChannelName() string {
if m != nil {
return m.ChannelID
return m.ChannelName
}
return 0
return ""
}
func (m *SegIDAssignment) GetCount() uint32 {
@ -382,6 +390,7 @@ func (m *SegIDAssignment) GetPartitionName() string {
type AssignSegIDResponse struct {
SegIDAssignments []*SegIDAssignment `protobuf:"bytes,1,rep,name=segIDAssignments,proto3" json:"segIDAssignments,omitempty"`
Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -419,6 +428,13 @@ func (m *AssignSegIDResponse) GetSegIDAssignments() []*SegIDAssignment {
return nil
}
func (m *AssignSegIDResponse) GetStatus() *commonpb.Status {
if m != nil {
return m.Status
}
return nil
}
type FlushRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
@ -837,7 +853,7 @@ func (m *InsertChannelRequest) GetCollectionID() int64 {
type WatchDmChannelRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
ChannelIDs []string `protobuf:"bytes,2,rep,name=channelIDs,proto3" json:"channelIDs,omitempty"`
ChannelNames []string `protobuf:"bytes,2,rep,name=channelNames,proto3" json:"channelNames,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -875,9 +891,9 @@ func (m *WatchDmChannelRequest) GetBase() *commonpb.MsgBase {
return nil
}
func (m *WatchDmChannelRequest) GetChannelIDs() []string {
func (m *WatchDmChannelRequest) GetChannelNames() []string {
if m != nil {
return m.ChannelIDs
return m.ChannelNames
}
return nil
}
@ -1064,78 +1080,79 @@ func init() {
func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) }
var fileDescriptor_3385cd32ad6cfe64 = []byte{
// 1127 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x56, 0xcf, 0x6e, 0xdb, 0x46,
0x13, 0xb7, 0x44, 0xc9, 0x96, 0x46, 0xb2, 0xa4, 0xac, 0x9d, 0xef, 0x53, 0x94, 0x34, 0x76, 0x09,
0x24, 0x36, 0x82, 0xd6, 0x2e, 0x1c, 0xa4, 0xbd, 0x15, 0x8d, 0xab, 0xd4, 0x10, 0xda, 0x18, 0xc6,
0xaa, 0x40, 0x81, 0xf4, 0x20, 0x50, 0xe2, 0x98, 0x5a, 0x80, 0xdc, 0x65, 0xb9, 0xab, 0xd8, 0xf0,
0xa5, 0x7d, 0x80, 0x1c, 0x7a, 0xef, 0xbd, 0x2f, 0x51, 0xa0, 0x4f, 0xd0, 0x87, 0x2a, 0xb8, 0x4b,
0xd1, 0xa4, 0x44, 0x47, 0x86, 0xdb, 0xf4, 0xc6, 0x1d, 0xfe, 0x76, 0xfe, 0xfe, 0x66, 0x66, 0x81,
0xb8, 0x8e, 0x72, 0x46, 0x12, 0xa3, 0xb7, 0x6c, 0x82, 0x07, 0x61, 0x24, 0x94, 0x20, 0xf7, 0x02,
0xe6, 0xbf, 0x9d, 0x49, 0x73, 0x3a, 0x88, 0x01, 0xbd, 0xe6, 0x44, 0x04, 0x81, 0xe0, 0x46, 0xd4,
0x6b, 0x31, 0xae, 0x30, 0xe2, 0x8e, 0x6f, 0xce, 0xf6, 0xcf, 0xb0, 0x45, 0xd1, 0x63, 0x52, 0x61,
0x74, 0x2a, 0x5c, 0xa4, 0xf8, 0xd3, 0x0c, 0xa5, 0x22, 0x9f, 0x41, 0x65, 0xec, 0x48, 0xec, 0x96,
0x76, 0x4b, 0xfb, 0x8d, 0xa3, 0x47, 0x07, 0x39, 0xb5, 0x89, 0xc2, 0xd7, 0xd2, 0x3b, 0x76, 0x24,
0x52, 0x8d, 0x24, 0x9f, 0xc3, 0x86, 0xe3, 0xba, 0x11, 0x4a, 0xd9, 0x2d, 0xbf, 0xe7, 0xd2, 0x4b,
0x83, 0xa1, 0x73, 0xb0, 0xfd, 0x06, 0xb6, 0xf3, 0x0e, 0xc8, 0x50, 0x70, 0x89, 0xe4, 0x18, 0x1a,
0x8c, 0x33, 0x35, 0x0a, 0x9d, 0xc8, 0x09, 0x64, 0xe2, 0xc8, 0xc7, 0x79, 0x9d, 0x69, 0x2c, 0x03,
0xce, 0xd4, 0x99, 0x06, 0x52, 0x60, 0xe9, 0xb7, 0xfd, 0x57, 0x09, 0x9a, 0x43, 0xf4, 0x06, 0xfd,
0x79, 0x58, 0xdb, 0x50, 0x9d, 0x88, 0x19, 0x57, 0x5a, 0xdd, 0x26, 0x35, 0x07, 0xf2, 0x08, 0xea,
0x93, 0xa9, 0xc3, 0x39, 0xfa, 0x83, 0xbe, 0x76, 0xbe, 0x4e, 0xaf, 0x05, 0xc4, 0x86, 0xe6, 0x44,
0xf8, 0x3e, 0x4e, 0x14, 0x13, 0x7c, 0xd0, 0xef, 0x5a, 0xbb, 0xa5, 0x7d, 0x8b, 0xe6, 0x64, 0x64,
0x17, 0x1a, 0xa1, 0x13, 0x29, 0x96, 0x40, 0x2a, 0x1a, 0x92, 0x15, 0x91, 0x87, 0x50, 0x8f, 0x6f,
0x8c, 0xb8, 0x13, 0x60, 0xb7, 0xaa, 0x6d, 0xd4, 0x62, 0xc1, 0xa9, 0x13, 0x20, 0x79, 0x02, 0xad,
0x14, 0x6b, 0x10, 0xeb, 0x1a, 0xb1, 0x99, 0x4a, 0x63, 0x98, 0xfd, 0x6b, 0x09, 0xc8, 0x4b, 0x29,
0x99, 0xc7, 0x73, 0x41, 0xfd, 0x0f, 0xd6, 0xb9, 0x70, 0x71, 0xd0, 0xd7, 0x51, 0x59, 0x34, 0x39,
0xc5, 0x26, 0x43, 0xc4, 0x68, 0x14, 0x09, 0x1f, 0x93, 0xb0, 0x6a, 0xb1, 0x80, 0x0a, 0x1f, 0xc9,
0x2b, 0xd8, 0x94, 0x19, 0x25, 0xb2, 0x6b, 0xed, 0x5a, 0xfb, 0x8d, 0xa3, 0x9d, 0x83, 0x25, 0x02,
0x1d, 0x64, 0x8d, 0xd1, 0xfc, 0x2d, 0xfb, 0x8f, 0x32, 0xb4, 0xf5, 0x7f, 0xe3, 0x57, 0x80, 0x5c,
0x27, 0x59, 0x83, 0x12, 0x77, 0xcc, 0x61, 0x39, 0xc9, 0xd5, 0x6c, 0x92, 0xd3, 0xc2, 0x58, 0xd9,
0xc2, 0x2c, 0xa6, 0xbe, 0xb2, 0x3a, 0xf5, 0xd5, 0xe5, 0xd4, 0xef, 0x40, 0x03, 0x2f, 0x43, 0x16,
0xe1, 0x48, 0xb1, 0x24, 0xb5, 0x15, 0x0a, 0x46, 0xf4, 0x3d, 0x0b, 0x90, 0x3c, 0x87, 0x75, 0xa9,
0x1c, 0x35, 0x93, 0xdd, 0x0d, 0xcd, 0xb2, 0x87, 0x85, 0xcc, 0x1d, 0x6a, 0x08, 0x4d, 0xa0, 0xf9,
0x82, 0xd6, 0x56, 0x16, 0xb4, 0x5e, 0x54, 0x50, 0x84, 0xad, 0x5c, 0x3d, 0x13, 0xea, 0x9f, 0x42,
0x47, 0xe6, 0x73, 0x1a, 0xf3, 0x3f, 0x2e, 0x8f, 0x7d, 0x53, 0x79, 0xae, 0xa1, 0x74, 0xe9, 0xae,
0x7d, 0x09, 0xcd, 0x6f, 0xfc, 0x99, 0x9c, 0xde, 0xbd, 0xb9, 0x09, 0x54, 0xdc, 0x71, 0x52, 0x37,
0x8b, 0xea, 0xef, 0xdb, 0x14, 0xc7, 0x7e, 0x57, 0x02, 0x32, 0x9c, 0x8a, 0x8b, 0x21, 0x7a, 0xda,
0xb7, 0x3b, 0x3b, 0xb0, 0x68, 0xac, 0xbc, 0x9a, 0x09, 0xd6, 0x12, 0x13, 0xec, 0x17, 0xb0, 0x95,
0xf3, 0x26, 0xc9, 0xf7, 0x63, 0x00, 0x69, 0x44, 0x83, 0xbe, 0xc9, 0xb4, 0x45, 0x33, 0x12, 0xfb,
0x1c, 0xb6, 0x93, 0x2b, 0x31, 0x07, 0x50, 0xde, 0x3d, 0x8c, 0x47, 0x50, 0x4f, 0xf5, 0x26, 0x31,
0x5c, 0x0b, 0xec, 0xdf, 0xcb, 0x70, 0x7f, 0xc1, 0x50, 0xe2, 0xe1, 0x0b, 0xa8, 0xc6, 0xb4, 0x33,
0xa6, 0x5a, 0x37, 0x75, 0x69, 0x7a, 0x91, 0x1a, 0x74, 0xcc, 0xfc, 0x49, 0x84, 0x8e, 0x4a, 0x98,
0x5f, 0x36, 0xcc, 0x37, 0x22, 0xcd, 0xfc, 0x1d, 0x68, 0x48, 0x74, 0x7c, 0x74, 0x0d, 0xc0, 0x32,
0x00, 0x23, 0xd2, 0x80, 0x6f, 0xa1, 0x2d, 0x95, 0x13, 0xa9, 0x51, 0x28, 0xa4, 0xce, 0xa2, 0xec,
0x56, 0x8a, 0x98, 0x98, 0x4e, 0xe2, 0xd7, 0xd2, 0x3b, 0x4b, 0xa0, 0xb4, 0xa5, 0xaf, 0xce, 0x8f,
0x92, 0x9c, 0xc0, 0x26, 0x72, 0x37, 0xa3, 0xaa, 0x7a, 0x6b, 0x55, 0x4d, 0xe4, 0x6e, 0xaa, 0xc8,
0x66, 0xf0, 0xff, 0x01, 0x97, 0x18, 0xa9, 0x63, 0xc6, 0x7d, 0xe1, 0x9d, 0x39, 0x6a, 0xfa, 0xa1,
0x6a, 0x12, 0xc2, 0x83, 0x45, 0x53, 0xd7, 0x65, 0xe9, 0x41, 0xed, 0x9c, 0xa1, 0xef, 0x5e, 0xd3,
0x26, 0x3d, 0x93, 0x2f, 0xa0, 0x1a, 0xc6, 0xe0, 0x6e, 0x59, 0x07, 0x79, 0xd3, 0xe6, 0x1a, 0xaa,
0x88, 0x71, 0xef, 0x3b, 0x26, 0x15, 0x35, 0x78, 0xfb, 0x97, 0x12, 0x6c, 0x1b, 0x93, 0x5f, 0x9b,
0xf1, 0xf8, 0x61, 0xdb, 0xb6, 0x60, 0x9d, 0xd9, 0x0c, 0xee, 0xff, 0xe0, 0xa8, 0xc9, 0xb4, 0x1f,
0xfc, 0x63, 0x17, 0x1e, 0x03, 0xa4, 0x53, 0xde, 0xe4, 0xa2, 0x4e, 0x33, 0x12, 0xfb, 0xb7, 0x12,
0xb4, 0xf5, 0x70, 0x1a, 0xa2, 0xf7, 0x9f, 0x07, 0xba, 0xd0, 0xf9, 0x95, 0xa5, 0xce, 0x7f, 0x57,
0x86, 0x46, 0xd2, 0x58, 0x03, 0x7e, 0x2e, 0xf2, 0x5c, 0x29, 0x2d, 0x70, 0xe5, 0xdf, 0x19, 0x52,
0x64, 0x0f, 0xda, 0x4c, 0x97, 0x7f, 0x94, 0xa4, 0xc9, 0x38, 0x56, 0xa7, 0x2d, 0x96, 0x65, 0x85,
0xde, 0x40, 0x22, 0x44, 0x6e, 0x5a, 0xb7, 0xaa, 0x5b, 0xb7, 0x16, 0x0b, 0x74, 0xe3, 0x7e, 0x04,
0x30, 0xf1, 0x85, 0xcc, 0xed, 0xbc, 0xba, 0x96, 0xe8, 0xdf, 0x0f, 0xa0, 0xc6, 0x67, 0xc1, 0x28,
0x12, 0x17, 0x66, 0xe9, 0x59, 0x74, 0x83, 0xcf, 0x02, 0x2a, 0x2e, 0x64, 0xfc, 0x2b, 0xc0, 0x60,
0x24, 0xd9, 0x95, 0xd9, 0x6b, 0x16, 0xdd, 0x08, 0x30, 0x18, 0xb2, 0x2b, 0x7c, 0xf6, 0xa3, 0x7e,
0x4e, 0xa5, 0x63, 0x86, 0xb4, 0xd3, 0xec, 0x9c, 0x0a, 0x8e, 0x9d, 0x35, 0xb2, 0xa5, 0x5f, 0x03,
0x46, 0xa0, 0x5e, 0x5d, 0x32, 0xa9, 0x3a, 0x25, 0x42, 0xa0, 0x95, 0x08, 0x4f, 0x22, 0x71, 0xc1,
0xb8, 0xd7, 0x29, 0x93, 0x7b, 0xb0, 0x39, 0xd7, 0xa4, 0x87, 0x4d, 0xc7, 0x3a, 0xfa, 0xb3, 0x0a,
0x8d, 0xbe, 0xa3, 0x9c, 0xa1, 0x79, 0xd0, 0x12, 0x07, 0x9a, 0xd9, 0x87, 0x21, 0x79, 0x5a, 0x30,
0xf4, 0x0a, 0x9e, 0xae, 0xbd, 0xbd, 0x95, 0x38, 0xd3, 0xbd, 0xf6, 0x1a, 0x39, 0x81, 0xaa, 0xe6,
0x1e, 0x29, 0x1a, 0xa8, 0xd9, 0x95, 0xd9, 0x7b, 0xdf, 0x93, 0xc0, 0x5e, 0x23, 0x63, 0x68, 0xa7,
0x8b, 0x3c, 0x21, 0xc3, 0x93, 0x02, 0x95, 0xcb, 0x8f, 0xb7, 0xde, 0xd3, 0x55, 0xb0, 0xd4, 0xd9,
0x11, 0x34, 0x33, 0xcb, 0x4b, 0x16, 0x1a, 0x58, 0xde, 0xb5, 0x85, 0x06, 0x0a, 0x96, 0xa0, 0xbd,
0x46, 0x3c, 0xe8, 0x9c, 0xa0, 0xca, 0x2d, 0x20, 0xb2, 0xb7, 0x62, 0xd3, 0xcc, 0x77, 0x61, 0x6f,
0x7f, 0x35, 0x30, 0x35, 0x14, 0xc1, 0xf6, 0x09, 0xaa, 0xa5, 0xb1, 0x4a, 0x9e, 0x15, 0xe8, 0xb8,
0x61, 0xce, 0xf7, 0x3e, 0xb9, 0x05, 0x36, 0x6b, 0xd3, 0x81, 0x7b, 0xa9, 0xcd, 0xb4, 0x83, 0xf6,
0x6e, 0x54, 0x92, 0x9f, 0x7b, 0xbd, 0xd5, 0xd3, 0xdb, 0x5e, 0x3b, 0xfe, 0xea, 0xcd, 0x97, 0x1e,
0x53, 0xd3, 0xd9, 0x38, 0xa6, 0xc7, 0xe1, 0x15, 0xf3, 0x7d, 0x76, 0xa5, 0x70, 0x32, 0x3d, 0x34,
0x77, 0x3f, 0x75, 0x99, 0x54, 0x11, 0x1b, 0xcf, 0x14, 0xba, 0x87, 0x73, 0x0d, 0x87, 0x5a, 0xe1,
0x61, 0x6c, 0x39, 0x1c, 0x8f, 0xd7, 0xf5, 0xe9, 0xf9, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x0e,
0x0b, 0x09, 0xf1, 0xda, 0x0d, 0x00, 0x00,
// 1144 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x56, 0xdd, 0x6e, 0x1b, 0xc5,
0x17, 0x8f, 0xbf, 0x12, 0xfb, 0xd8, 0xb1, 0xdd, 0x49, 0xfa, 0xff, 0xbb, 0x6e, 0x21, 0x61, 0xa5,
0x36, 0x51, 0x05, 0x09, 0x4a, 0x55, 0xb8, 0x43, 0x34, 0xb8, 0x44, 0x16, 0x34, 0x8a, 0xc6, 0x48,
0x48, 0xe5, 0xc2, 0x5a, 0x7b, 0x4f, 0xd6, 0x23, 0xed, 0xce, 0x2c, 0x3b, 0xe3, 0x26, 0xca, 0x0d,
0x3c, 0x40, 0x9f, 0x00, 0xee, 0xb9, 0xe4, 0x11, 0xe0, 0x15, 0x78, 0x24, 0xb4, 0x33, 0xeb, 0xf5,
0xae, 0xbd, 0xae, 0xa3, 0x40, 0xb9, 0xdb, 0x39, 0xfb, 0x9b, 0xf3, 0xf5, 0x3b, 0x1f, 0x03, 0xc4,
0xb1, 0x95, 0x3d, 0x94, 0x18, 0xbe, 0x61, 0x63, 0x3c, 0x0a, 0x42, 0xa1, 0x04, 0xb9, 0xe7, 0x33,
0xef, 0xcd, 0x54, 0x9a, 0xd3, 0x51, 0x04, 0xe8, 0x36, 0xc6, 0xc2, 0xf7, 0x05, 0x37, 0xa2, 0x6e,
0x93, 0x71, 0x85, 0x21, 0xb7, 0x3d, 0x73, 0xb6, 0x7e, 0x82, 0x1d, 0x8a, 0x2e, 0x93, 0x0a, 0xc3,
0x73, 0xe1, 0x20, 0xc5, 0x1f, 0xa7, 0x28, 0x15, 0xf9, 0x14, 0xca, 0x23, 0x5b, 0x62, 0xa7, 0xb0,
0x5f, 0x38, 0xac, 0x9f, 0x3c, 0x3a, 0xca, 0xa8, 0x8d, 0x15, 0xbe, 0x92, 0xee, 0xa9, 0x2d, 0x91,
0x6a, 0x24, 0xf9, 0x0c, 0xb6, 0x6c, 0xc7, 0x09, 0x51, 0xca, 0x4e, 0xf1, 0x1d, 0x97, 0x5e, 0x18,
0x0c, 0x9d, 0x81, 0xad, 0xd7, 0xb0, 0x9b, 0x75, 0x40, 0x06, 0x82, 0x4b, 0x24, 0xa7, 0x50, 0x67,
0x9c, 0xa9, 0x61, 0x60, 0x87, 0xb6, 0x2f, 0x63, 0x47, 0x3e, 0xca, 0xea, 0x4c, 0x62, 0xe9, 0x73,
0xa6, 0x2e, 0x34, 0x90, 0x02, 0x4b, 0xbe, 0xad, 0xbf, 0x0a, 0xd0, 0x18, 0xa0, 0xdb, 0xef, 0xcd,
0xc2, 0xda, 0x85, 0xca, 0x58, 0x4c, 0xb9, 0xd2, 0xea, 0xb6, 0xa9, 0x39, 0x90, 0x7d, 0xa8, 0x8f,
0x27, 0x36, 0xe7, 0xe8, 0x9d, 0xdb, 0x3e, 0x6a, 0xf7, 0x6b, 0x34, 0x2d, 0x22, 0x16, 0x34, 0xc6,
0xc2, 0xf3, 0x70, 0xac, 0x98, 0xe0, 0xfd, 0x5e, 0xa7, 0xb4, 0x5f, 0x38, 0x2c, 0xd1, 0x8c, 0x2c,
0xd2, 0x12, 0xd8, 0xa1, 0x62, 0x31, 0xa4, 0xac, 0x21, 0x69, 0x11, 0x79, 0x08, 0xb5, 0xe8, 0xc6,
0x90, 0x47, 0x56, 0x2a, 0xda, 0x4a, 0x35, 0x12, 0x68, 0x13, 0x8f, 0xa1, 0x99, 0x60, 0x0d, 0x62,
0x53, 0x23, 0xb6, 0x13, 0x69, 0x04, 0xb3, 0x7e, 0x2f, 0x00, 0x79, 0x21, 0x25, 0x73, 0x79, 0x26,
0xb0, 0xff, 0xc1, 0x26, 0x17, 0x0e, 0xf6, 0x7b, 0x3a, 0xb2, 0x12, 0x8d, 0x4f, 0x91, 0xc9, 0x00,
0x31, 0x1c, 0x86, 0xc2, 0x9b, 0x05, 0x56, 0x8d, 0x04, 0x54, 0x78, 0x48, 0x1e, 0x41, 0x4d, 0x31,
0x1f, 0xa5, 0xb2, 0xfd, 0x40, 0x87, 0x54, 0xa6, 0x73, 0x01, 0x79, 0x09, 0xdb, 0x32, 0x65, 0x42,
0x76, 0xca, 0xfb, 0xa5, 0xc3, 0xfa, 0xc9, 0xde, 0xd1, 0x52, 0x89, 0x1d, 0xa5, 0x5d, 0xa1, 0xd9,
0x5b, 0xd6, 0x9f, 0x45, 0x68, 0xe9, 0xff, 0xc6, 0x6b, 0x1f, 0xb9, 0xa6, 0x41, 0x83, 0x62, 0x67,
0xcd, 0xe1, 0x16, 0x34, 0x24, 0xf4, 0x95, 0xd2, 0xf4, 0x2d, 0x92, 0x53, 0x5e, 0x4f, 0x4e, 0x65,
0x99, 0x9c, 0x3d, 0xa8, 0xe3, 0x75, 0xc0, 0x42, 0x1c, 0x46, 0x29, 0xd0, 0xc9, 0x2f, 0x53, 0x30,
0xa2, 0xef, 0x98, 0x8f, 0xe4, 0x19, 0x6c, 0x4a, 0x65, 0xab, 0xa9, 0xec, 0x6c, 0xe9, 0x5a, 0x7c,
0x98, 0x5b, 0xdf, 0x03, 0x0d, 0xa1, 0x31, 0x34, 0x4b, 0x79, 0x75, 0x2d, 0xe5, 0xb5, 0x3c, 0xca,
0x7f, 0x29, 0xc0, 0x4e, 0x86, 0xf2, 0xb8, 0x43, 0xce, 0xa1, 0x2d, 0xb3, 0x89, 0x8d, 0xda, 0x24,
0xe2, 0xc8, 0x5a, 0xc5, 0xd1, 0x1c, 0x4a, 0x97, 0xee, 0xa6, 0x02, 0x2c, 0xde, 0x3a, 0x40, 0xeb,
0x1a, 0x1a, 0x5f, 0x7b, 0x53, 0x39, 0xb9, 0xfb, 0xe0, 0x20, 0x50, 0x76, 0x46, 0xfd, 0x9e, 0x36,
0x5a, 0xa2, 0xfa, 0xfb, 0x36, 0x94, 0x5a, 0x6f, 0x0b, 0x40, 0x06, 0x13, 0x71, 0x35, 0x40, 0x57,
0x07, 0x74, 0x67, 0x07, 0x16, 0x8d, 0x15, 0xd7, 0xd7, 0x4f, 0x69, 0xa9, 0x7e, 0xac, 0xe7, 0xb0,
0x93, 0xf1, 0x26, 0x26, 0xe9, 0x43, 0x00, 0x69, 0x44, 0xfd, 0x9e, 0xa1, 0xa7, 0x44, 0x53, 0x12,
0xeb, 0x12, 0x76, 0xe3, 0x2b, 0x51, 0x62, 0x51, 0xde, 0x3d, 0x8c, 0x47, 0x50, 0x4b, 0xf4, 0xc6,
0x31, 0xcc, 0x05, 0xd6, 0x6f, 0x45, 0xb8, 0xbf, 0x60, 0x28, 0xf6, 0xf0, 0x39, 0x54, 0x22, 0x2e,
0x8d, 0xa9, 0xe6, 0xaa, 0xfe, 0x4e, 0x2e, 0x52, 0x83, 0x8e, 0xfa, 0x65, 0x1c, 0xa2, 0xad, 0xe2,
0x7e, 0x29, 0x9a, 0x7e, 0x31, 0x22, 0xdd, 0x2f, 0x7b, 0x50, 0x97, 0x68, 0x7b, 0xe8, 0x18, 0x80,
0x99, 0x2f, 0x60, 0x44, 0x1a, 0xf0, 0x0d, 0xb4, 0xa4, 0xb2, 0x43, 0x35, 0x0c, 0x84, 0xd4, 0x59,
0x9c, 0x8d, 0x18, 0x6b, 0xc5, 0x94, 0x7f, 0x25, 0xdd, 0x8b, 0x18, 0x4a, 0x9b, 0xfa, 0xea, 0xec,
0x28, 0xc9, 0x19, 0x6c, 0x23, 0x77, 0x52, 0xaa, 0x2a, 0xb7, 0x56, 0xd5, 0x40, 0xee, 0x24, 0x8a,
0x2c, 0x06, 0xff, 0xef, 0x73, 0x89, 0xa1, 0x3a, 0x65, 0xdc, 0x13, 0xee, 0x85, 0xad, 0x26, 0xef,
0x8b, 0x93, 0x00, 0x1e, 0x2c, 0x9a, 0x9a, 0xd3, 0xd2, 0x85, 0xea, 0x25, 0x43, 0xcf, 0x99, 0x97,
0x4d, 0x72, 0x26, 0x9f, 0x43, 0x25, 0x88, 0xc0, 0x9d, 0xa2, 0x0e, 0x72, 0xd5, 0x56, 0x1c, 0xa8,
0x90, 0x71, 0xf7, 0x5b, 0x26, 0x15, 0x35, 0x78, 0xeb, 0xe7, 0x02, 0xec, 0x1a, 0x93, 0x5f, 0x99,
0xb1, 0xfa, 0x7e, 0xdb, 0x36, 0x67, 0x4d, 0x5a, 0x3e, 0xdc, 0xff, 0xde, 0x56, 0xe3, 0x49, 0xcf,
0xff, 0xc7, 0x2e, 0x44, 0xe6, 0xe6, 0xdb, 0xc1, 0x64, 0xa3, 0x46, 0x33, 0x32, 0xeb, 0xd7, 0x02,
0xb4, 0xf4, 0x80, 0x1a, 0xa0, 0xfb, 0x9f, 0x07, 0xbb, 0xd0, 0xfd, 0xe5, 0xa5, 0xee, 0x7f, 0x5b,
0x84, 0x7a, 0xdc, 0x5c, 0x7d, 0x7e, 0x29, 0xb2, 0xf5, 0x52, 0x58, 0xa8, 0x97, 0x7f, 0x67, 0x50,
0x91, 0x03, 0x68, 0x31, 0x5d, 0x02, 0xc3, 0x38, 0x51, 0xc6, 0xb1, 0x1a, 0x6d, 0xb2, 0x74, 0x65,
0xe8, 0xdd, 0x25, 0x02, 0xe4, 0xa6, 0x7d, 0x2b, 0xba, 0x7d, 0xab, 0x91, 0x40, 0x37, 0xef, 0x07,
0x00, 0x63, 0x4f, 0xc8, 0xcc, 0xb6, 0xac, 0x69, 0x89, 0xfe, 0xfd, 0x00, 0xaa, 0x7c, 0xea, 0x0f,
0x43, 0x71, 0x65, 0xd6, 0x65, 0x89, 0x6e, 0xf1, 0xa9, 0x4f, 0xc5, 0x95, 0x8c, 0x7e, 0xf9, 0xe8,
0x0f, 0x25, 0xbb, 0x31, 0x1b, 0xb1, 0x44, 0xb7, 0x7c, 0xf4, 0x07, 0xec, 0x06, 0x9f, 0xfe, 0xa0,
0x9f, 0x6b, 0xc9, 0xa8, 0x21, 0xad, 0x24, 0x3b, 0xe7, 0x82, 0x63, 0x7b, 0x83, 0xec, 0xe8, 0xb7,
0x84, 0x11, 0xa8, 0x97, 0xd7, 0x4c, 0xaa, 0x76, 0x81, 0x10, 0x68, 0xc6, 0xc2, 0xb3, 0x50, 0x5c,
0x31, 0xee, 0xb6, 0x8b, 0xe4, 0x1e, 0x6c, 0xcf, 0x34, 0xe9, 0x81, 0xd3, 0x2e, 0x9d, 0xfc, 0x51,
0x81, 0x7a, 0xcf, 0x56, 0xf6, 0xc0, 0x3c, 0x98, 0x89, 0x0d, 0x8d, 0xf4, 0xc3, 0x93, 0x3c, 0xc9,
0x19, 0x7c, 0x39, 0x4f, 0xe3, 0xee, 0xc1, 0x5a, 0x9c, 0xe9, 0x60, 0x6b, 0x83, 0x9c, 0x41, 0x45,
0xd7, 0x1e, 0xc9, 0x1b, 0xaa, 0xe9, 0xb5, 0xd9, 0x7d, 0xd7, 0xae, 0xb5, 0x36, 0xc8, 0x08, 0x5a,
0xc9, 0x0b, 0x20, 0x2e, 0x86, 0xc7, 0x39, 0x2a, 0x97, 0x1f, 0x86, 0xdd, 0x27, 0xeb, 0x60, 0x89,
0xb3, 0x43, 0x68, 0xa4, 0x16, 0x98, 0xcc, 0x35, 0xb0, 0xbc, 0x6f, 0x73, 0x0d, 0xe4, 0x2c, 0x42,
0x6b, 0x83, 0xb8, 0xd0, 0x3e, 0x43, 0x95, 0x59, 0x42, 0xe4, 0x60, 0xcd, 0xb6, 0x99, 0xed, 0xc3,
0xee, 0xe1, 0x7a, 0x60, 0x62, 0x28, 0x84, 0xdd, 0x33, 0x54, 0x4b, 0xa3, 0x95, 0x3c, 0xcd, 0xd1,
0xb1, 0x62, 0xd6, 0x77, 0x3f, 0xbe, 0x05, 0x36, 0x6d, 0xd3, 0x86, 0x7b, 0x89, 0xcd, 0xa4, 0x83,
0x0e, 0x56, 0x2a, 0xc9, 0xce, 0xbe, 0xee, 0xfa, 0x09, 0x6e, 0x6d, 0x9c, 0x7e, 0xf9, 0xfa, 0x0b,
0x97, 0xa9, 0xc9, 0x74, 0x14, 0x95, 0xc7, 0xf1, 0x0d, 0xf3, 0x3c, 0x76, 0xa3, 0x70, 0x3c, 0x39,
0x36, 0x77, 0x3f, 0x71, 0x98, 0x54, 0x21, 0x1b, 0x4d, 0x15, 0x3a, 0xc7, 0x33, 0x0d, 0xc7, 0x5a,
0xe1, 0x71, 0x64, 0x39, 0x18, 0x8d, 0x36, 0xf5, 0xe9, 0xd9, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff,
0x92, 0x26, 0x37, 0x78, 0x3a, 0x0e, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.

View File

@ -4,85 +4,50 @@ import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/errors"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type timeSyncMsgProducer struct {
//softTimeTickBarrier
proxyTtBarrier TimeTickBarrier
//hardTimeTickBarrier
writeNodeTtBarrier TimeTickBarrier
ddSyncStream ms.MsgStream // insert & delete
dmSyncStream ms.MsgStream
k2sSyncStream ms.MsgStream
type MsgProducer struct {
ttBarrier TimeTickBarrier
ctx context.Context
cancel context.CancelFunc
proxyWatchers []TimeTickWatcher
writeNodeWatchers []TimeTickWatcher
watchers []TimeTickWatcher
}
func NewTimeSyncMsgProducer(ctx context.Context) (*timeSyncMsgProducer, error) {
ctx2, cancel := context.WithCancel(ctx)
return &timeSyncMsgProducer{ctx: ctx2, cancel: cancel}, nil
func NewTimeSyncMsgProducer(ctx context.Context, ttBarrier TimeTickBarrier, watchers ...TimeTickWatcher) (*MsgProducer, error) {
childCtx, cancelFunc := context.WithCancel(ctx)
return &MsgProducer{
ctx: childCtx,
cancel: cancelFunc,
ttBarrier: ttBarrier,
watchers: watchers,
}, nil
}
func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) {
syncMsgProducer.proxyTtBarrier = proxyTtBarrier
}
func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) {
syncMsgProducer.writeNodeTtBarrier = writeNodeTtBarrier
}
func (syncMsgProducer *timeSyncMsgProducer) SetDDSyncStream(ddSync ms.MsgStream) {
syncMsgProducer.ddSyncStream = ddSync
}
func (syncMsgProducer *timeSyncMsgProducer) SetDMSyncStream(dmSync ms.MsgStream) {
syncMsgProducer.dmSyncStream = dmSync
}
func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSync ms.MsgStream) {
syncMsgProducer.k2sSyncStream = k2sSync
}
func (syncMsgProducer *timeSyncMsgProducer) WatchProxyTtBarrier(watcher TimeTickWatcher) {
syncMsgProducer.proxyWatchers = append(syncMsgProducer.proxyWatchers, watcher)
}
func (syncMsgProducer *timeSyncMsgProducer) WatchWriteNodeTtBarrier(watcher TimeTickWatcher) {
syncMsgProducer.writeNodeWatchers = append(syncMsgProducer.writeNodeWatchers, watcher)
}
func (syncMsgProducer *timeSyncMsgProducer) broadcastMsg(barrier TimeTickBarrier, streams []ms.MsgStream, watchers []TimeTickWatcher) error {
func (producer *MsgProducer) broadcastMsg() {
for {
select {
case <-syncMsgProducer.ctx.Done():
{
log.Printf("broadcast context done, exit")
return errors.Errorf("broadcast done exit")
}
case <-producer.ctx.Done():
log.Printf("broadcast context done, exit")
default:
timetick, err := barrier.GetTimeTick()
tt, err := producer.ttBarrier.GetTimeTick()
if err != nil {
log.Printf("broadcast get time tick error")
}
msgPack := ms.MsgPack{}
baseMsg := ms.BaseMsg{
BeginTimestamp: timetick,
EndTimestamp: timetick,
BeginTimestamp: tt,
EndTimestamp: tt,
HashValues: []uint32{0},
}
timeTickResult := internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
MsgID: 0,
Timestamp: timetick,
Timestamp: tt,
SourceID: 0,
},
}
@ -90,56 +55,32 @@ func (syncMsgProducer *timeSyncMsgProducer) broadcastMsg(barrier TimeTickBarrier
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
for _, stream := range streams {
err = stream.Broadcast(&msgPack)
}
for _, watcher := range watchers {
for _, watcher := range producer.watchers {
watcher.Watch(timeTickMsg)
}
if err != nil {
return err
}
}
}
}
func (syncMsgProducer *timeSyncMsgProducer) Start() error {
err := syncMsgProducer.proxyTtBarrier.Start()
func (producer *MsgProducer) Start() error {
err := producer.ttBarrier.Start()
if err != nil {
return err
}
err = syncMsgProducer.writeNodeTtBarrier.Start()
if err != nil {
return err
}
for _, watcher := range syncMsgProducer.proxyWatchers {
watcher.Start()
}
for _, watcher := range syncMsgProducer.writeNodeWatchers {
for _, watcher := range producer.watchers {
watcher.Start()
}
go syncMsgProducer.broadcastMsg(syncMsgProducer.proxyTtBarrier, []ms.MsgStream{syncMsgProducer.dmSyncStream, syncMsgProducer.ddSyncStream}, syncMsgProducer.proxyWatchers)
go syncMsgProducer.broadcastMsg(syncMsgProducer.writeNodeTtBarrier, []ms.MsgStream{syncMsgProducer.k2sSyncStream}, syncMsgProducer.writeNodeWatchers)
go producer.broadcastMsg()
return nil
}
func (syncMsgProducer *timeSyncMsgProducer) Close() {
syncMsgProducer.ddSyncStream.Close()
syncMsgProducer.dmSyncStream.Close()
syncMsgProducer.k2sSyncStream.Close()
syncMsgProducer.cancel()
syncMsgProducer.proxyTtBarrier.Close()
syncMsgProducer.writeNodeTtBarrier.Close()
for _, watcher := range syncMsgProducer.proxyWatchers {
watcher.Close()
}
for _, watcher := range syncMsgProducer.writeNodeWatchers {
func (producer *MsgProducer) Close() {
producer.cancel()
producer.ttBarrier.Close()
for _, watcher := range producer.watchers {
watcher.Close()
}
}

View File

@ -1,6 +1,8 @@
package timesync
import (
"log"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
)
@ -9,3 +11,24 @@ type TimeTickWatcher interface {
Start()
Close()
}
type MsgTimeTickWatcher struct {
streams []ms.MsgStream
}
func (watcher *MsgTimeTickWatcher) Watch(msg *ms.TimeTickMsg) {
msgPack := &ms.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, msg)
for _, stream := range watcher.streams {
if err := stream.Broadcast(msgPack); err != nil {
log.Printf("stream broadcast failed %s", err.Error())
}
}
}
func (watcher *MsgTimeTickWatcher) Start() {
}
func (watcher *MsgTimeTickWatcher) Close() {
}