diff --git a/internal/allocator/segment.go b/internal/allocator/segment.go index 97c6372819..8510fa384f 100644 --- a/internal/allocator/segment.go +++ b/internal/allocator/segment.go @@ -176,7 +176,7 @@ func (sa *SegIDAssigner) pickCanDoFunc() { if assign == nil || assign.Capacity(segRequest.timestamp) < records[colName][partitionName][channelID] { partitionID, _ := typeutil.Hash32String(segRequest.colName) sa.segReqs = append(sa.segReqs, &datapb.SegIDRequest{ - ChannelID: strconv.FormatUint(uint64(segRequest.channelID), 10), + ChannelName: strconv.FormatUint(uint64(segRequest.channelID), 10), Count: segRequest.count, CollName: segRequest.colName, PartitionName: segRequest.partitionName, @@ -220,7 +220,7 @@ func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegIDRequest) bool if req1 == req2 { return true } - return req1.CollName == req2.CollName && req1.PartitionName == req2.PartitionName && req1.ChannelID == req2.ChannelID + return req1.CollName == req2.CollName && req1.PartitionName == req2.PartitionName && req1.ChannelName == req2.ChannelName } func (sa *SegIDAssigner) reduceSegReqs() { @@ -281,7 +281,12 @@ func (sa *SegIDAssigner) syncSegments() bool { log.Println("SyncSegment Error:", info.Status.Reason) continue } - assign := sa.getAssign(info.CollName, info.PartitionName, info.ChannelID) + // FIXME: use channelName + channel, err := strconv.Atoi(info.ChannelName) + if err != nil { + return false + } + assign := sa.getAssign(info.CollName, info.PartitionName, int32(channel)) segInfo := &segInfo{ segID: info.SegID, count: info.Count, @@ -298,7 +303,7 @@ func (sa *SegIDAssigner) syncSegments() bool { assign = &assignInfo{ collID: info.CollectionID, partitionID: info.PartitionID, - channelID: info.ChannelID, + channelID: int32(channel), segInfos: segInfos, partitionName: info.PartitionName, collName: info.CollName, diff --git a/internal/core/src/pb/common.pb.cc b/internal/core/src/pb/common.pb.cc index 4a03946492..26a5543370 100644 --- a/internal/core/src/pb/common.pb.cc +++ b/internal/core/src/pb/common.pb.cc @@ -249,7 +249,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "PE\020\027\022\021\n\rOUT_OF_MEMORY\020\030\022\024\n\017DD_REQUEST_RA" "CE\020\350\007*N\n\nIndexState\022\010\n\004NONE\020\000\022\014\n\010UNISSUE" "D\020\001\022\016\n\nINPROGRESS\020\002\022\014\n\010FINISHED\020\003\022\n\n\006FAI" - "LED\020\004*\245\004\n\007MsgType\022\t\n\005kNone\020\000\022\025\n\021kCreateC" + "LED\020\004*\274\004\n\007MsgType\022\t\n\005kNone\020\000\022\025\n\021kCreateC" "ollection\020d\022\023\n\017kDropCollection\020e\022\022\n\016kHas" "Collection\020f\022\027\n\023kDescribeCollection\020g\022\024\n" "\020kShowCollections\020h\022\022\n\016kGetSysConfigs\020i\022" @@ -262,9 +262,10 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "ate\020\366\003\022\035\n\030kGetCollectionStatistics\020\367\003\022\034\n" "\027kGetPartitionStatistics\020\370\003\022\016\n\tkTimeTick" "\020\260\t\022\024\n\017kQueryNodeStats\020\261\t\022\017\n\nkLoadIndex\020" - "\262\t\022\017\n\nkRequestID\020\263\t\022\020\n\013kRequestTSO\020\264\tBBZ" - "@github.com/zilliztech/milvus-distribute" - "d/internal/proto/commonpbb\006proto3" + "\262\t\022\017\n\nkRequestID\020\263\t\022\020\n\013kRequestTSO\020\264\t\022\025\n" + "\020kAllocateSegment\020\265\tBBZ@github.com/zilli" + "ztech/milvus-distributed/internal/proto/" + "commonpbb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = { }; @@ -280,7 +281,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once; static bool descriptor_table_common_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = { - &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 1673, + &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 1696, &descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 7, 0, schemas, file_default_instances, TableStruct_common_2eproto::offsets, file_level_metadata_common_2eproto, 7, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto, @@ -378,6 +379,7 @@ bool MsgType_IsValid(int value) { case 1202: case 1203: case 1204: + case 1205: return true; default: return false; diff --git a/internal/core/src/pb/common.pb.h b/internal/core/src/pb/common.pb.h index fdd360d447..343f38dde7 100644 --- a/internal/core/src/pb/common.pb.h +++ b/internal/core/src/pb/common.pb.h @@ -199,12 +199,13 @@ enum MsgType : int { kLoadIndex = 1202, kRequestID = 1203, kRequestTSO = 1204, + kAllocateSegment = 1205, MsgType_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(), MsgType_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max() }; bool MsgType_IsValid(int value); constexpr MsgType MsgType_MIN = kNone; -constexpr MsgType MsgType_MAX = kRequestTSO; +constexpr MsgType MsgType_MAX = kAllocateSegment; constexpr int MsgType_ARRAYSIZE = MsgType_MAX + 1; const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* MsgType_descriptor(); diff --git a/internal/dataservice/allocator.go b/internal/dataservice/allocator.go new file mode 100644 index 0000000000..08a7106fbd --- /dev/null +++ b/internal/dataservice/allocator.go @@ -0,0 +1,23 @@ +package dataservice + +type allocator interface { + allocTimestamp() (Timestamp, error) + allocID() (UniqueID, error) +} + +type allocatorImpl struct { + // TODO call allocate functions in client.go in master service +} + +// TODO implements +func newAllocatorImpl() *allocatorImpl { + return nil +} + +func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) { + return 0, nil +} + +func (allocator *allocatorImpl) allocID() (UniqueID, error) { + return 0, nil +} diff --git a/internal/dataservice/data_service.go b/internal/dataservice/data_service.go deleted file mode 100644 index 32ebc5d9bb..0000000000 --- a/internal/dataservice/data_service.go +++ /dev/null @@ -1,5 +0,0 @@ -package dataservice - -type DataService struct { - segAllocator segmentAllocator -} diff --git a/internal/dataservice/grpc_service.go b/internal/dataservice/grpc_service.go index e4b9184a7d..3ace918e2f 100644 --- a/internal/dataservice/grpc_service.go +++ b/internal/dataservice/grpc_service.go @@ -1,40 +1,63 @@ package dataservice import ( + "fmt" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "golang.org/x/net/context" ) -func (ds *DataService) RegisterNode(context.Context, *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { +func (ds *Server) RegisterNode(context.Context, *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { return nil, nil } -func (ds *DataService) Flush(context.Context, *datapb.FlushRequest) (*commonpb.Status, error) { +func (ds *Server) Flush(context.Context, *datapb.FlushRequest) (*commonpb.Status, error) { return nil, nil } -func (ds *DataService) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) { - //for _, req := range request.SegIDRequests { - // segmentID, retCount, expireTs, err := ds.segAllocator.AllocSegment(req.CollectionID, req.PartitionID, req.ChannelID, int(req.Count)) - // if err != nil { - // log.Printf() - // } - //} - return nil, nil +func (ds *Server) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) { + resp := &datapb.AssignSegIDResponse{ + SegIDAssignments: make([]*datapb.SegIDAssignment, 0), + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + }, + } + task := &allocateTask{ + baseTask: baseTask{ + sch: ds.scheduler, + meta: ds.meta, + cv: make(chan error), + }, + req: request, + resp: resp, + segAllocator: ds.segAllocator, + insertCMapper: ds.insertCMapper, + } + + if err := ds.scheduler.Enqueue(task); err != nil { + resp.Status.Reason = fmt.Sprintf("enqueue error: %s", err.Error()) + return resp, nil + } + + if err := task.WaitToFinish(ctx); err != nil { + resp.Status.Reason = fmt.Sprintf("wait to finish error: %s", err.Error()) + return resp, nil + } + return resp, nil } -func (ds *DataService) ShowSegments(context.Context, *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) { +func (ds *Server) ShowSegments(context.Context, *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) { return nil, nil } -func (ds *DataService) GetSegmentStates(context.Context, *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { +func (ds *Server) GetSegmentStates(context.Context, *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { return nil, nil } -func (ds *DataService) GetInsertBinlogPaths(context.Context, *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) { +func (ds *Server) GetInsertBinlogPaths(context.Context, *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) { return nil, nil } -func (ds *DataService) GetInsertChannels(context.Context, *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { +func (ds *Server) GetInsertChannels(context.Context, *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { return nil, nil } diff --git a/internal/dataservice/meta.go b/internal/dataservice/meta.go index 88b058e4b0..ac1b1986b4 100644 --- a/internal/dataservice/meta.go +++ b/internal/dataservice/meta.go @@ -29,15 +29,17 @@ type ( collID2Info map[UniqueID]*collectionInfo // collection id to collection info segID2Info map[UniqueID]*datapb.SegmentInfo // segment id to segment info - ddLock sync.RWMutex + allocator allocator + ddLock sync.RWMutex } ) -func NewMetaTable(kv kv.TxnBase) (*meta, error) { +func newMetaTable(kv kv.TxnBase, allocator allocator) (*meta, error) { mt := &meta{ client: kv, collID2Info: make(map[UniqueID]*collectionInfo), segID2Info: make(map[UniqueID]*datapb.SegmentInfo), + allocator: allocator, } err := mt.reloadFromKV() if err != nil { @@ -46,8 +48,8 @@ func NewMetaTable(kv kv.TxnBase) (*meta, error) { return mt, nil } -func (mt *meta) reloadFromKV() error { - _, values, err := mt.client.LoadWithPrefix("segment") +func (meta *meta) reloadFromKV() error { + _, values, err := meta.client.LoadWithPrefix("segment") if err != nil { return err } @@ -58,123 +60,145 @@ func (mt *meta) reloadFromKV() error { if err != nil { return err } - mt.segID2Info[segmentInfo.SegmentID] = segmentInfo + meta.segID2Info[segmentInfo.SegmentID] = segmentInfo } return nil } -func (mt *meta) AddCollection(collectionInfo *collectionInfo) error { - mt.ddLock.Lock() - defer mt.ddLock.Unlock() - if _, ok := mt.collID2Info[collectionInfo.ID]; ok { +func (meta *meta) AddCollection(collectionInfo *collectionInfo) error { + meta.ddLock.Lock() + defer meta.ddLock.Unlock() + if _, ok := meta.collID2Info[collectionInfo.ID]; ok { return fmt.Errorf("collection %s with id %d already exist", collectionInfo.Schema.Name, collectionInfo.ID) } - mt.collID2Info[collectionInfo.ID] = collectionInfo + meta.collID2Info[collectionInfo.ID] = collectionInfo return nil } -func (mt *meta) DropCollection(collID UniqueID) error { - mt.ddLock.Lock() - defer mt.ddLock.Unlock() +func (meta *meta) DropCollection(collID UniqueID) error { + meta.ddLock.Lock() + defer meta.ddLock.Unlock() - if _, ok := mt.collID2Info[collID]; !ok { + if _, ok := meta.collID2Info[collID]; !ok { return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collID, 10)) } - delete(mt.collID2Info, collID) - for id, segment := range mt.segID2Info { + delete(meta.collID2Info, collID) + for id, segment := range meta.segID2Info { if segment.CollectionID != collID { continue } - delete(mt.segID2Info, id) - if err := mt.removeSegmentInfo(id); err != nil { + delete(meta.segID2Info, id) + if err := meta.removeSegmentInfo(id); err != nil { log.Printf("remove segment info failed, %s", err.Error()) - _ = mt.reloadFromKV() + _ = meta.reloadFromKV() } } return nil } -func (mt *meta) HasCollection(collID UniqueID) bool { - mt.ddLock.RLock() - defer mt.ddLock.RUnlock() - _, ok := mt.collID2Info[collID] +func (meta *meta) HasCollection(collID UniqueID) bool { + meta.ddLock.RLock() + defer meta.ddLock.RUnlock() + _, ok := meta.collID2Info[collID] return ok } -func (mt *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error { - mt.ddLock.Lock() - defer mt.ddLock.Unlock() - if _, ok := mt.segID2Info[segmentInfo.SegmentID]; !ok { +func (meta *meta) BuildSegment(collectionID UniqueID, partitionID UniqueID, channelRange []string) (*datapb.SegmentInfo, error) { + id, err := meta.allocator.allocID() + if err != nil { + return nil, err + } + ts, err := meta.allocator.allocTimestamp() + if err != nil { + return nil, err + } + + return &datapb.SegmentInfo{ + SegmentID: id, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannels: channelRange, + OpenTime: ts, + CloseTime: 0, + NumRows: 0, + MemSize: 0, + }, nil +} + +func (meta *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error { + meta.ddLock.Lock() + defer meta.ddLock.Unlock() + if _, ok := meta.segID2Info[segmentInfo.SegmentID]; !ok { return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID) } - mt.segID2Info[segmentInfo.SegmentID] = segmentInfo - if err := mt.saveSegmentInfo(segmentInfo); err != nil { - _ = mt.reloadFromKV() + meta.segID2Info[segmentInfo.SegmentID] = segmentInfo + if err := meta.saveSegmentInfo(segmentInfo); err != nil { + _ = meta.reloadFromKV() return err } return nil } -func (mt *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error { - mt.ddLock.Lock() - defer mt.ddLock.Unlock() +func (meta *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error { + meta.ddLock.Lock() + defer meta.ddLock.Unlock() - mt.segID2Info[segmentInfo.SegmentID] = segmentInfo - if err := mt.saveSegmentInfo(segmentInfo); err != nil { - _ = mt.reloadFromKV() + meta.segID2Info[segmentInfo.SegmentID] = segmentInfo + if err := meta.saveSegmentInfo(segmentInfo); err != nil { + _ = meta.reloadFromKV() return err } return nil } -func (mt *meta) GetSegmentByID(segID UniqueID) (*datapb.SegmentInfo, error) { - mt.ddLock.RLock() - defer mt.ddLock.RUnlock() +func (meta *meta) GetSegment(segID UniqueID) (*datapb.SegmentInfo, error) { + meta.ddLock.RLock() + defer meta.ddLock.RUnlock() - segmentInfo, ok := mt.segID2Info[segID] + segmentInfo, ok := meta.segID2Info[segID] if !ok { return nil, errors.Errorf("GetSegmentByID:can't find segment id = %d", segID) } return segmentInfo, nil } -func (mt *meta) CloseSegment(segID UniqueID, closeTs Timestamp) error { - mt.ddLock.Lock() - defer mt.ddLock.Unlock() +func (meta *meta) CloseSegment(segID UniqueID, closeTs Timestamp) error { + meta.ddLock.Lock() + defer meta.ddLock.Unlock() - segInfo, ok := mt.segID2Info[segID] + segInfo, ok := meta.segID2Info[segID] if !ok { return errors.Errorf("DropSegment:can't find segment id = " + strconv.FormatInt(segID, 10)) } segInfo.CloseTime = closeTs - err := mt.saveSegmentInfo(segInfo) + err := meta.saveSegmentInfo(segInfo) if err != nil { - _ = mt.reloadFromKV() + _ = meta.reloadFromKV() return err } return nil } -func (mt *meta) GetCollection(collectionID UniqueID) (*collectionInfo, error) { - mt.ddLock.RLock() - defer mt.ddLock.RUnlock() +func (meta *meta) GetCollection(collectionID UniqueID) (*collectionInfo, error) { + meta.ddLock.RLock() + defer meta.ddLock.RUnlock() - collectionInfo, ok := mt.collID2Info[collectionID] + collectionInfo, ok := meta.collID2Info[collectionID] if !ok { return nil, fmt.Errorf("collection %d not found", collectionID) } return collectionInfo, nil } -func (mt *meta) saveSegmentInfo(segmentInfo *datapb.SegmentInfo) error { +func (meta *meta) saveSegmentInfo(segmentInfo *datapb.SegmentInfo) error { segBytes := proto.MarshalTextString(segmentInfo) - return mt.client.Save("/segment/"+strconv.FormatInt(segmentInfo.SegmentID, 10), segBytes) + return meta.client.Save("/segment/"+strconv.FormatInt(segmentInfo.SegmentID, 10), segBytes) } -func (mt *meta) removeSegmentInfo(segID UniqueID) error { - return mt.client.Remove("/segment/" + strconv.FormatInt(segID, 10)) +func (meta *meta) removeSegmentInfo(segID UniqueID) error { + return meta.client.Remove("/segment/" + strconv.FormatInt(segID, 10)) } diff --git a/internal/dataservice/scheduler.go b/internal/dataservice/scheduler.go new file mode 100644 index 0000000000..3699b38ec8 --- /dev/null +++ b/internal/dataservice/scheduler.go @@ -0,0 +1,89 @@ +package dataservice + +import ( + "context" + "log" + + "github.com/zilliztech/milvus-distributed/internal/errors" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" +) + +//type ddRequestScheduler interface {} + +//type ddReqFIFOScheduler struct {} + +type ddRequestScheduler struct { + ctx context.Context + cancel context.CancelFunc + + globalIDAllocator func() (UniqueID, error) + reqQueue chan task + scheduleTimeStamp Timestamp + ddMsgStream ms.MsgStream +} + +func NewDDRequestScheduler(ctx context.Context) *ddRequestScheduler { + const channelSize = 1024 + + ctx2, cancel := context.WithCancel(ctx) + + rs := ddRequestScheduler{ + ctx: ctx2, + cancel: cancel, + reqQueue: make(chan task, channelSize), + } + return &rs +} + +func (rs *ddRequestScheduler) Enqueue(task task) error { + rs.reqQueue <- task + return nil +} + +func (rs *ddRequestScheduler) SetIDAllocator(allocGlobalID func() (UniqueID, error)) { + rs.globalIDAllocator = allocGlobalID +} + +func (rs *ddRequestScheduler) SetDDMsgStream(ddStream ms.MsgStream) { + rs.ddMsgStream = ddStream +} + +func (rs *ddRequestScheduler) scheduleLoop() { + for { + select { + case task := <-rs.reqQueue: + err := rs.schedule(task) + if err != nil { + log.Println(err) + } + case <-rs.ctx.Done(): + log.Print("server is closed, exit task execution loop") + return + } + } +} + +func (rs *ddRequestScheduler) schedule(t task) error { + timeStamp, err := t.Ts() + if err != nil { + log.Println(err) + return err + } + if timeStamp < rs.scheduleTimeStamp { + t.Notify(errors.Errorf("input timestamp = %d, schduler timestamp = %d", timeStamp, rs.scheduleTimeStamp)) + } else { + rs.scheduleTimeStamp = timeStamp + err = t.Execute() + t.Notify(err) + } + return nil +} + +func (rs *ddRequestScheduler) Start() error { + go rs.scheduleLoop() + return nil +} + +func (rs *ddRequestScheduler) Close() { + rs.cancel() +} diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go index 6c1ac9dea7..7dfeff72af 100644 --- a/internal/dataservice/segment_allocator.go +++ b/internal/dataservice/segment_allocator.go @@ -6,8 +6,6 @@ import ( "sync" "time" - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" @@ -17,18 +15,18 @@ type errRemainInSufficient struct { requestRows int } -func newErrRemainInSufficient(requestRows int) *errRemainInSufficient { - return &errRemainInSufficient{requestRows: requestRows} +func newErrRemainInSufficient(requestRows int) errRemainInSufficient { + return errRemainInSufficient{requestRows: requestRows} } -func (err *errRemainInSufficient) Error() string { +func (err errRemainInSufficient) Error() string { return "segment remaining is insufficient for" + strconv.Itoa(err.requestRows) } // segmentAllocator is used to allocate rows for segments and record the allocations. type segmentAllocator interface { // OpenSegment add the segment to allocator and set it allocatable - OpenSegment(segmentInfo *datapb.SegmentInfo) error + OpenSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, cRange channelRange) error // AllocSegment allocate rows and record the allocation. AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error) // GetSealedSegments get all sealed segment. @@ -61,52 +59,42 @@ type ( segmentAllocatorImpl struct { mt *meta segments map[UniqueID]*segmentStatus //segment id -> status - cMapper *insertChannelMapper segmentExpireDuration int64 - defaultSizePerRecord int64 segmentThreshold float64 segmentThresholdFactor float64 - numOfChannels int - numOfQueryNodes int mu sync.RWMutex - globalIDAllocator func() (UniqueID, error) - globalTSOAllocator func() (Timestamp, error) + allocator allocator } ) -func newSegmentAssigner(metaTable *meta, globalIDAllocator func() (UniqueID, error), - globalTSOAllocator func() (Timestamp, error)) (*segmentAllocatorImpl, error) { +func newSegmentAssigner(metaTable *meta, allocator allocator) (*segmentAllocatorImpl, error) { segmentAllocator := &segmentAllocatorImpl{ mt: metaTable, segments: make(map[UniqueID]*segmentStatus), segmentExpireDuration: Params.SegIDAssignExpiration, - defaultSizePerRecord: Params.DefaultRecordSize, segmentThreshold: Params.SegmentSize * 1024 * 1024, segmentThresholdFactor: Params.SegmentSizeFactor, - numOfChannels: Params.TopicNum, - numOfQueryNodes: Params.QueryNodeNum, - globalIDAllocator: globalIDAllocator, - globalTSOAllocator: globalTSOAllocator, + allocator: allocator, } return segmentAllocator, nil } -func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error { - if _, ok := allocator.segments[segmentInfo.SegmentID]; ok { - return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID) +func (allocator *segmentAllocatorImpl) OpenSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, cRange channelRange) error { + if _, ok := allocator.segments[segmentID]; ok { + return fmt.Errorf("segment %d already exist", segmentID) } - totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID) + totalRows, err := allocator.estimateTotalRows(collectionID) if err != nil { return err } - allocator.segments[segmentInfo.SegmentID] = &segmentStatus{ - id: segmentInfo.SegmentID, - collectionID: segmentInfo.CollectionID, - partitionID: segmentInfo.PartitionID, + allocator.segments[segmentID] = &segmentStatus{ + id: segmentID, + collectionID: collectionID, + partitionID: partitionID, total: totalRows, sealed: false, lastExpireTime: 0, - cRange: segmentInfo.InsertChannels, + cRange: cRange, } return nil } @@ -143,7 +131,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i for _, allocation := range segStatus.allocations { totalOfAllocations += allocation.rowNums } - segMeta, err := allocator.mt.GetSegmentByID(segStatus.id) + segMeta, err := allocator.mt.GetSegment(segStatus.id) if err != nil { return false, err } @@ -152,7 +140,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i return false, nil } - ts, err := allocator.globalTSOAllocator() + ts, err := allocator.allocator.allocTimestamp() if err != nil { return false, err } @@ -162,7 +150,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i segStatus.lastExpireTime = expireTs segStatus.allocations = append(segStatus.allocations, &allocation{ numRows, - ts, + expireTs, }) return true, nil @@ -200,7 +188,7 @@ func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) { } func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStatus) (bool, error) { - segMeta, err := allocator.mt.GetSegmentByID(segStatus.id) + segMeta, err := allocator.mt.GetSegment(segStatus.id) if err != nil { return false, err } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go new file mode 100644 index 0000000000..89dc76c206 --- /dev/null +++ b/internal/dataservice/server.go @@ -0,0 +1,8 @@ +package dataservice + +type Server struct { + segAllocator segmentAllocator + meta *meta + insertCMapper insertChannelMapper + scheduler *ddRequestScheduler +} diff --git a/internal/dataservice/stats_processor.go b/internal/dataservice/stats_processor.go new file mode 100644 index 0000000000..5636f1ca52 --- /dev/null +++ b/internal/dataservice/stats_processor.go @@ -0,0 +1,55 @@ +package dataservice + +import ( + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +) + +type statsProcessor struct { + meta *meta + segmentThreshold float64 + segmentThresholdFactor float64 +} + +func newStatsProcessor(meta *meta) *statsProcessor { + return &statsProcessor{ + meta: meta, + segmentThreshold: Params.SegmentSize * 1024 * 1024, + segmentThresholdFactor: Params.SegmentSizeFactor, + } +} + +func (processor *statsProcessor) ProcessQueryNodeStats(msgPack *msgstream.MsgPack) error { + for _, msg := range msgPack.Msgs { + statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg) + if !ok { + return errors.Errorf("Type of message is not QueryNodeSegStatsMsg") + } + + for _, segStat := range statsMsg.GetSegStats() { + if err := processor.processSegmentStat(segStat); err != nil { + return err + } + } + } + + return nil +} + +func (processor *statsProcessor) processSegmentStat(segStats *internalpb2.SegmentStats) error { + if !segStats.GetRecentlyModified() { + return nil + } + + segID := segStats.GetSegmentID() + segMeta, err := processor.meta.GetSegment(segID) + if err != nil { + return err + } + + segMeta.NumRows = segStats.NumRows + segMeta.MemSize = segStats.MemorySize + + return processor.meta.UpdateSegment(segMeta) +} diff --git a/internal/dataservice/task.go b/internal/dataservice/task.go new file mode 100644 index 0000000000..1af5db8adf --- /dev/null +++ b/internal/dataservice/task.go @@ -0,0 +1,126 @@ +package dataservice + +import ( + "context" + "fmt" + "log" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + + "github.com/zilliztech/milvus-distributed/internal/errors" +) + +// TODO: get timestamp from timestampOracle + +type task interface { + Type() commonpb.MsgType + Ts() (Timestamp, error) + Execute() error + WaitToFinish(ctx context.Context) error + Notify(err error) +} +type baseTask struct { + sch *ddRequestScheduler + meta *meta + cv chan error +} + +func (bt *baseTask) Notify(err error) { + bt.cv <- err +} + +func (bt *baseTask) WaitToFinish(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return errors.Errorf("context done") + case err, ok := <-bt.cv: + if !ok { + return errors.Errorf("notify chan closed") + } + return err + } + } +} + +type allocateTask struct { + baseTask + req *datapb.AssignSegIDRequest + resp *datapb.AssignSegIDResponse + segAllocator segmentAllocator + insertCMapper insertChannelMapper +} + +func (task *allocateTask) Type() commonpb.MsgType { + return commonpb.MsgType_kAllocateSegment +} + +func (task *allocateTask) Ts() (Timestamp, error) { + return task.req.Timestamp, nil +} + +func (task *allocateTask) Execute() error { + for _, req := range task.req.SegIDRequests { + result := &datapb.SegIDAssignment{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + }, + } + segmentID, retCount, expireTs, err := task.segAllocator.AllocSegment(req.CollectionID, req.PartitionID, req.ChannelName, int(req.Count)) + if err != nil { + if _, ok := err.(errRemainInSufficient); !ok { + result.Status.Reason = fmt.Sprintf("allocation of Collection %d, Partition %d, Channel %s, Count %d error: %s", + req.CollectionID, req.PartitionID, req.ChannelName, req.Count, err.Error()) + task.resp.SegIDAssignments = append(task.resp.SegIDAssignments, result) + continue + } + + log.Printf("no enough space for allocation of Collection %d, Partition %d, Channel %s, Count %d", + req.CollectionID, req.PartitionID, req.ChannelName, req.Count) + if err = task.openNewSegment(req.CollectionID, req.PartitionID, req.ChannelName); err != nil { + result.Status.Reason = fmt.Sprintf("open new segment of Collection %d, Partition %d, Channel %s, Count %d error: %s", + req.CollectionID, req.PartitionID, req.ChannelName, req.Count, err.Error()) + task.resp.SegIDAssignments = append(task.resp.SegIDAssignments, result) + continue + } + segmentID, retCount, expireTs, err = task.segAllocator.AllocSegment(req.CollectionID, req.PartitionID, req.ChannelName, int(req.Count)) + if err != nil { + result.Status.Reason = fmt.Sprintf("retry allocation of Collection %d, Partition %d, Channel %s, Count %d error: %s", + req.CollectionID, req.PartitionID, req.ChannelName, req.Count, err.Error()) + task.resp.SegIDAssignments = append(task.resp.SegIDAssignments, result) + continue + } + } + result.Status.ErrorCode = commonpb.ErrorCode_SUCCESS + result.CollectionID = req.CollectionID + result.SegID = segmentID + result.PartitionID = req.PartitionID + result.Count = uint32(retCount) + result.ExpireTime = expireTs + result.ChannelName = req.ChannelName + task.resp.SegIDAssignments = append(task.resp.SegIDAssignments, result) + } + return nil +} + +func (task *allocateTask) openNewSegment(collectionID UniqueID, partitionID UniqueID, channelName string) error { + cRange, err := task.insertCMapper.GetChannelRange(channelName) + if err != nil { + return err + } + + segmentInfo, err := task.meta.BuildSegment(collectionID, partitionID, cRange) + if err != nil { + return err + } + + if err = task.meta.AddSegment(segmentInfo); err != nil { + return err + } + if err = task.segAllocator.OpenSegment(collectionID, partitionID, segmentInfo.SegmentID, segmentInfo.InsertChannels); err != nil { + return err + } + return nil +} diff --git a/internal/master/master_test.go b/internal/master/master_test.go index 214da649ad..f2de1b7201 100644 --- a/internal/master/master_test.go +++ b/internal/master/master_test.go @@ -1147,7 +1147,7 @@ func TestMaster(t *testing.T) { NodeID: 1, PeerRole: "ProxyNode", SegIDRequests: []*datapb.SegIDRequest{ - {Count: 10000, ChannelID: "0", CollName: collName, PartitionName: partitionName}, + {Count: 10000, ChannelName: "0", CollName: collName, PartitionName: partitionName}, }, }) assert.Nil(t, err) @@ -1156,7 +1156,7 @@ func TestMaster(t *testing.T) { assert.EqualValues(t, commonpb.ErrorCode_SUCCESS, assignments[0].Status.ErrorCode) assert.EqualValues(t, collName, assignments[0].CollName) assert.EqualValues(t, partitionName, assignments[0].PartitionName) - assert.EqualValues(t, int32(0), assignments[0].ChannelID) + assert.EqualValues(t, "0", assignments[0].ChannelName) assert.EqualValues(t, uint32(10000), assignments[0].Count) // test stats diff --git a/internal/master/segment_manager.go b/internal/master/segment_manager.go index d7e66927ba..9e438e0853 100644 --- a/internal/master/segment_manager.go +++ b/internal/master/segment_manager.go @@ -77,7 +77,7 @@ func (manager *SegmentManagerImpl) AssignSegment(segIDReq []*datapb.SegIDRequest collName := req.CollName paritionName := req.PartitionName count := req.Count - channelID := req.ChannelID + channelID := req.ChannelName collMeta, err := manager.metaTable.GetCollectionByName(collName) if err != nil { @@ -141,7 +141,7 @@ func (manager *SegmentManagerImpl) assignSegment( return &datapb.SegIDAssignment{ SegID: segStatus.segmentID, - ChannelID: channelID, + ChannelName: strconv.Itoa(int(channelID)), Count: count, CollName: collName, PartitionName: paritionName, @@ -176,7 +176,7 @@ func (manager *SegmentManagerImpl) assignSegment( } return &datapb.SegIDAssignment{ SegID: id, - ChannelID: channelID, + ChannelName: strconv.Itoa(int(channelID)), Count: count, CollName: collName, PartitionName: paritionName, diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index 9f6bc4187b..fb5a4d6656 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -112,8 +112,8 @@ func TestSegmentManager_AssignSegment(t *testing.T) { for _, c := range cases { result, _ := segManager.AssignSegment([]*datapb.SegIDRequest{ {Count: c.Count, - ChannelID: strconv.FormatInt(int64(c.ChannelID), 10), - CollName: collName, PartitionName: partitionName}, + ChannelName: strconv.FormatInt(int64(c.ChannelID), 10), + CollName: collName, PartitionName: partitionName}, }) results = append(results, result...) if c.Err { @@ -239,9 +239,9 @@ func TestSegmentManager_SycnWritenode(t *testing.T) { maxCount := uint32(Params.SegmentSize * 1024 * 1024 / float64(sizePerRecord)) req := []*datapb.SegIDRequest{ - {Count: maxCount, ChannelID: "1", CollName: collName, PartitionName: partitionName}, - {Count: maxCount, ChannelID: "2", CollName: collName, PartitionName: partitionName}, - {Count: maxCount, ChannelID: "3", CollName: collName, PartitionName: partitionName}, + {Count: maxCount, ChannelName: "1", CollName: collName, PartitionName: partitionName}, + {Count: maxCount, ChannelName: "2", CollName: collName, PartitionName: partitionName}, + {Count: maxCount, ChannelName: "3", CollName: collName, PartitionName: partitionName}, } assignSegment, err := segManager.AssignSegment(req) assert.Nil(t, err) @@ -249,7 +249,7 @@ func TestSegmentManager_SycnWritenode(t *testing.T) { assert.Nil(t, err) for i := 0; i < len(assignSegment); i++ { assert.EqualValues(t, maxCount, assignSegment[i].Count) - assert.EqualValues(t, i+1, assignSegment[i].ChannelID) + assert.EqualValues(t, strconv.Itoa(i+1), assignSegment[i].ChannelName) err = mt.UpdateSegment(&pb.SegmentMeta{ SegmentID: assignSegment[i].SegID, diff --git a/internal/proto/common.proto b/internal/proto/common.proto index 02dbacaf3d..3db4727528 100644 --- a/internal/proto/common.proto +++ b/internal/proto/common.proto @@ -105,7 +105,7 @@ enum MsgType { kLoadIndex = 1202; kRequestID = 1203; kRequestTSO = 1204; - + kAllocateSegment = 1205; } message MsgBase { diff --git a/internal/proto/commonpb/common.pb.go b/internal/proto/commonpb/common.pb.go index 53956d73b5..b4c4d34065 100644 --- a/internal/proto/commonpb/common.pb.go +++ b/internal/proto/commonpb/common.pb.go @@ -180,11 +180,12 @@ const ( MsgType_kGetCollectionStatistics MsgType = 503 MsgType_kGetPartitionStatistics MsgType = 504 // System Control - MsgType_kTimeTick MsgType = 1200 - MsgType_kQueryNodeStats MsgType = 1201 - MsgType_kLoadIndex MsgType = 1202 - MsgType_kRequestID MsgType = 1203 - MsgType_kRequestTSO MsgType = 1204 + MsgType_kTimeTick MsgType = 1200 + MsgType_kQueryNodeStats MsgType = 1201 + MsgType_kLoadIndex MsgType = 1202 + MsgType_kRequestID MsgType = 1203 + MsgType_kRequestTSO MsgType = 1204 + MsgType_kAllocateSegment MsgType = 1205 ) var MsgType_name = map[int32]string{ @@ -215,6 +216,7 @@ var MsgType_name = map[int32]string{ 1202: "kLoadIndex", 1203: "kRequestID", 1204: "kRequestTSO", + 1205: "kAllocateSegment", } var MsgType_value = map[string]int32{ @@ -245,6 +247,7 @@ var MsgType_value = map[string]int32{ "kLoadIndex": 1202, "kRequestID": 1203, "kRequestTSO": 1204, + "kAllocateSegment": 1205, } func (x MsgType) String() string { @@ -585,72 +588,73 @@ func init() { func init() { proto.RegisterFile("common.proto", fileDescriptor_555bd8c177793206) } var fileDescriptor_555bd8c177793206 = []byte{ - // 1062 bytes of a gzipped FileDescriptorProto + // 1077 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0xcb, 0x6e, 0xe3, 0x36, - 0x17, 0x1e, 0x5f, 0x12, 0x47, 0xc7, 0x8e, 0xc3, 0x30, 0x37, 0xff, 0x7f, 0xd3, 0x22, 0xf0, 0x2a, - 0x08, 0x30, 0x49, 0xd1, 0x02, 0xed, 0x6a, 0x80, 0x3a, 0x12, 0x9d, 0x10, 0x23, 0x4b, 0x1e, 0x4a, - 0x9e, 0x66, 0xba, 0x11, 0x64, 0x8b, 0x63, 0x0b, 0x92, 0x2d, 0x57, 0xa4, 0xa7, 0xf5, 0x3c, 0x45, - 0x3b, 0xef, 0xd0, 0x5d, 0x0b, 0xf4, 0xb6, 0xe8, 0x23, 0xf4, 0xf6, 0x20, 0x7d, 0x80, 0xde, 0x96, - 0x05, 0x25, 0x2b, 0x36, 0x8a, 0xe9, 0x8e, 0xe7, 0xfb, 0x78, 0x3e, 0x9e, 0xef, 0x90, 0x87, 0xd0, - 0x18, 0x25, 0xd3, 0x69, 0x32, 0xbb, 0x9c, 0xa7, 0x89, 0x4c, 0xf0, 0xc1, 0x34, 0x8c, 0x5f, 0x2c, - 0x44, 0x1e, 0x5d, 0xe6, 0x54, 0xbb, 0x06, 0x5b, 0x64, 0x3a, 0x97, 0xcb, 0xb6, 0x07, 0xdb, 0x8e, - 0xf4, 0xe5, 0x42, 0xe0, 0x47, 0x00, 0x3c, 0x4d, 0x93, 0xd4, 0x1b, 0x25, 0x01, 0x6f, 0x95, 0xce, - 0x4a, 0xe7, 0xcd, 0x77, 0xde, 0xba, 0x7c, 0x4d, 0xf2, 0x25, 0x51, 0xdb, 0xf4, 0x24, 0xe0, 0x4c, - 0xe3, 0xc5, 0x12, 0x1f, 0xc3, 0x76, 0xca, 0x7d, 0x91, 0xcc, 0x5a, 0xe5, 0xb3, 0xd2, 0xb9, 0xc6, - 0x56, 0x51, 0xfb, 0x3d, 0x68, 0x3c, 0xe6, 0xcb, 0xa7, 0x7e, 0xbc, 0xe0, 0x7d, 0x3f, 0x4c, 0x31, - 0x82, 0x4a, 0xc4, 0x97, 0x99, 0xbe, 0xc6, 0xd4, 0x12, 0x1f, 0xc2, 0xd6, 0x0b, 0x45, 0xaf, 0x12, - 0xf3, 0xa0, 0x7d, 0x0a, 0xd5, 0xeb, 0x38, 0x19, 0xae, 0x59, 0x95, 0xd1, 0x28, 0xd8, 0x87, 0x50, - 0xeb, 0x04, 0x41, 0xca, 0x85, 0xc0, 0x4d, 0x28, 0x87, 0xf3, 0x95, 0x5e, 0x39, 0x9c, 0x63, 0x0c, - 0xd5, 0x79, 0x92, 0xca, 0x4c, 0xad, 0xc2, 0xb2, 0x75, 0xfb, 0x55, 0x09, 0x6a, 0x3d, 0x31, 0xbe, - 0xf6, 0x05, 0xc7, 0xef, 0xc3, 0xce, 0x54, 0x8c, 0x3d, 0xb9, 0x9c, 0x17, 0x2e, 0x4f, 0x5f, 0xeb, - 0xb2, 0x27, 0xc6, 0xee, 0x72, 0xce, 0x59, 0x6d, 0x9a, 0x2f, 0x54, 0x25, 0x53, 0x31, 0xa6, 0xc6, - 0x4a, 0x39, 0x0f, 0xf0, 0x29, 0x68, 0x32, 0x9c, 0x72, 0x21, 0xfd, 0xe9, 0xbc, 0x55, 0x39, 0x2b, - 0x9d, 0x57, 0xd9, 0x1a, 0xc0, 0xff, 0x87, 0x1d, 0x91, 0x2c, 0xd2, 0x11, 0xa7, 0x46, 0xab, 0x9a, - 0xa5, 0xdd, 0xc7, 0xed, 0x47, 0xa0, 0xf5, 0xc4, 0xf8, 0x96, 0xfb, 0x01, 0x4f, 0xf1, 0xdb, 0x50, - 0x1d, 0xfa, 0x22, 0xaf, 0xa8, 0xfe, 0xdf, 0x15, 0x29, 0x07, 0x2c, 0xdb, 0x79, 0xf1, 0x43, 0x15, - 0xb4, 0xfb, 0x9b, 0xc0, 0x75, 0xa8, 0x39, 0x03, 0x5d, 0x27, 0x8e, 0x83, 0x1e, 0xe0, 0x43, 0x40, - 0x03, 0x8b, 0xdc, 0xf5, 0x89, 0xee, 0x12, 0xc3, 0x23, 0x8c, 0xd9, 0x0c, 0x95, 0x30, 0x86, 0xa6, - 0x6e, 0x5b, 0x16, 0xd1, 0x5d, 0xaf, 0xdb, 0xa1, 0x26, 0x31, 0x50, 0x19, 0x1f, 0xc1, 0x7e, 0x9f, - 0xb0, 0x1e, 0x75, 0x1c, 0x6a, 0x5b, 0x9e, 0x41, 0x2c, 0x4a, 0x0c, 0x54, 0xc1, 0xff, 0x83, 0x23, - 0xdd, 0x36, 0x4d, 0xa2, 0xbb, 0x0a, 0xb6, 0x6c, 0xd7, 0x23, 0x77, 0xd4, 0x71, 0x1d, 0x54, 0x55, - 0xda, 0xd4, 0x34, 0xc9, 0x4d, 0xc7, 0xf4, 0x3a, 0xec, 0x66, 0xd0, 0x23, 0x96, 0x8b, 0xb6, 0x94, - 0x4e, 0x81, 0x1a, 0xb4, 0x47, 0x2c, 0x25, 0x87, 0x6a, 0xf8, 0x18, 0x70, 0x01, 0x53, 0xcb, 0x20, - 0x77, 0x9e, 0xfb, 0xac, 0x4f, 0xd0, 0x0e, 0x7e, 0x03, 0x4e, 0x0a, 0x7c, 0xf3, 0x9c, 0x4e, 0x8f, - 0x20, 0x0d, 0x23, 0x68, 0x14, 0xa4, 0x6b, 0xf7, 0x1f, 0x23, 0xd8, 0x54, 0x67, 0xf6, 0x87, 0x8c, - 0xe8, 0x36, 0x33, 0x50, 0x7d, 0x13, 0x7e, 0x4a, 0x74, 0xd7, 0x66, 0x1e, 0x35, 0x50, 0x43, 0x15, - 0x5f, 0xc0, 0x0e, 0xe9, 0x30, 0xfd, 0xd6, 0x63, 0xc4, 0x19, 0x98, 0x2e, 0xda, 0x55, 0x2d, 0xe8, - 0x52, 0x93, 0x64, 0x8e, 0xba, 0xf6, 0xc0, 0x32, 0x50, 0x13, 0xef, 0x41, 0xbd, 0x47, 0xdc, 0x4e, - 0xd1, 0x93, 0x3d, 0x75, 0xbe, 0xde, 0xd1, 0x6f, 0x49, 0x81, 0x20, 0xdc, 0x82, 0x43, 0xbd, 0x63, - 0xa9, 0x24, 0x9d, 0x91, 0x8e, 0x4b, 0xbc, 0xae, 0x6d, 0x1a, 0x84, 0xa1, 0x7d, 0x65, 0xf0, 0x5f, - 0x0c, 0x35, 0x09, 0xc2, 0x1b, 0x19, 0x06, 0x31, 0xc9, 0x3a, 0xe3, 0x60, 0x23, 0xa3, 0x60, 0x54, - 0xc6, 0xa1, 0x32, 0x73, 0x3d, 0xa0, 0xa6, 0xb1, 0x6a, 0x54, 0x7e, 0x69, 0x47, 0x78, 0x1f, 0x76, - 0x0b, 0x33, 0x96, 0x49, 0x1d, 0x17, 0x1d, 0xe3, 0x13, 0x38, 0x28, 0xa0, 0x1e, 0x71, 0x19, 0xd5, - 0xf3, 0xae, 0x9e, 0xa8, 0xbd, 0xf6, 0xc0, 0xf5, 0xec, 0xae, 0xd7, 0x23, 0x3d, 0x9b, 0x3d, 0x43, - 0x2d, 0x7c, 0x08, 0x7b, 0x86, 0xe1, 0x31, 0xf2, 0x64, 0x40, 0x1c, 0xd7, 0x63, 0x1d, 0x9d, 0xa0, - 0xdf, 0x6a, 0x17, 0x16, 0x00, 0x9d, 0x05, 0xfc, 0x53, 0x35, 0xf9, 0x1c, 0xef, 0x40, 0xd5, 0xb2, - 0x2d, 0x82, 0x1e, 0xe0, 0x06, 0xec, 0x0c, 0x2c, 0xea, 0x38, 0x03, 0x62, 0xa0, 0x12, 0x6e, 0x02, - 0x50, 0xab, 0xcf, 0xec, 0x1b, 0xa6, 0x5e, 0x55, 0x59, 0xb1, 0x5d, 0x6a, 0x51, 0xe7, 0x36, 0x7b, - 0x22, 0x00, 0xdb, 0xab, 0xfe, 0x54, 0x2f, 0xbe, 0xa8, 0x66, 0xe3, 0x95, 0x4d, 0x89, 0x06, 0x5b, - 0x91, 0x95, 0xcc, 0x38, 0x7a, 0xa0, 0x2c, 0x45, 0x7a, 0xca, 0x7d, 0xc9, 0xf5, 0x24, 0x8e, 0xf9, - 0x48, 0x86, 0xc9, 0x0c, 0x05, 0xf8, 0x00, 0xf6, 0x22, 0x23, 0x4d, 0xe6, 0x1b, 0x20, 0x57, 0x37, - 0x13, 0xdd, 0xfa, 0x62, 0x03, 0x7b, 0xae, 0x8c, 0x46, 0x06, 0x17, 0xa3, 0x34, 0x1c, 0x6e, 0x2a, - 0x8c, 0xd5, 0x1b, 0x8c, 0x9c, 0x49, 0xf2, 0xc9, 0x1a, 0x14, 0x68, 0x92, 0x49, 0xdc, 0x70, 0xe9, - 0x2c, 0x85, 0x9e, 0xcc, 0x9e, 0x87, 0x63, 0x81, 0x42, 0x7c, 0x04, 0x68, 0x55, 0x42, 0xdf, 0x4f, - 0x65, 0x98, 0xe5, 0xff, 0x58, 0xc2, 0x07, 0xd0, 0xcc, 0x4a, 0x58, 0x83, 0x3f, 0xa9, 0xf9, 0xd8, - 0x55, 0x25, 0xac, 0xb1, 0x9f, 0x4b, 0xf8, 0x04, 0xf0, 0x7d, 0x09, 0x6b, 0xe2, 0x97, 0x92, 0x6a, - 0x6c, 0x56, 0xc2, 0x3d, 0x28, 0xd0, 0xaf, 0x25, 0xbc, 0x0f, 0x8d, 0xd5, 0x71, 0x59, 0x7f, 0xd1, - 0x97, 0xe5, 0xfc, 0xa8, 0x95, 0x42, 0x0e, 0x7e, 0xa5, 0x5a, 0x59, 0x8b, 0xe8, 0x4c, 0xf0, 0x54, - 0xa2, 0xcf, 0x2a, 0x59, 0x64, 0xf0, 0x98, 0x4b, 0x8e, 0x3e, 0xaf, 0xe0, 0x3a, 0x6c, 0x47, 0xdd, - 0x78, 0x21, 0x26, 0xe8, 0x55, 0x4e, 0x39, 0xdc, 0x4f, 0x47, 0x13, 0xf4, 0x7b, 0x25, 0xab, 0x30, - 0x8f, 0x18, 0x17, 0x8b, 0x58, 0xa2, 0x3f, 0x2a, 0x99, 0xfe, 0x0d, 0x97, 0xeb, 0xfb, 0x44, 0x7f, - 0x56, 0xf0, 0x9b, 0xd0, 0x52, 0xe0, 0xba, 0x3f, 0x8a, 0x09, 0x85, 0x0c, 0x47, 0x02, 0xfd, 0x55, - 0xc1, 0xa7, 0x70, 0xa2, 0xe8, 0xfb, 0xda, 0x37, 0xd8, 0xbf, 0x2b, 0xb8, 0x09, 0x5a, 0xe4, 0x86, - 0x53, 0xee, 0x86, 0xa3, 0x08, 0x7d, 0xad, 0x65, 0x56, 0x9f, 0x2c, 0x78, 0xba, 0xb4, 0x92, 0x80, - 0xab, 0xad, 0x02, 0x7d, 0xa3, 0xe1, 0x3d, 0x80, 0xc8, 0x4c, 0xfc, 0x20, 0xf7, 0xf4, 0x6d, 0x0e, - 0x30, 0xfe, 0xf1, 0x82, 0x0b, 0x49, 0x0d, 0xf4, 0x9d, 0x9a, 0xe3, 0x7a, 0x01, 0xb8, 0x8e, 0x8d, - 0xbe, 0xd7, 0xae, 0xaf, 0x3f, 0xfa, 0x60, 0x1c, 0xca, 0xc9, 0x62, 0xa8, 0x7e, 0xb4, 0xab, 0x97, - 0x61, 0x1c, 0x87, 0x2f, 0x25, 0x1f, 0x4d, 0xae, 0xf2, 0xdf, 0xee, 0x61, 0x10, 0x0a, 0x99, 0x86, - 0xc3, 0x85, 0xe4, 0xc1, 0x55, 0x38, 0x93, 0x3c, 0x9d, 0xf9, 0xf1, 0x55, 0xf6, 0x05, 0x5e, 0xe5, - 0x5f, 0xe0, 0x7c, 0x38, 0xdc, 0xce, 0xe2, 0x77, 0xff, 0x09, 0x00, 0x00, 0xff, 0xff, 0x31, 0x9c, - 0x64, 0xd6, 0xe5, 0x06, 0x00, 0x00, + 0x14, 0x1d, 0x3f, 0x12, 0x47, 0xd7, 0x1e, 0x87, 0x61, 0x5e, 0x6e, 0x9b, 0x16, 0x81, 0x57, 0x41, + 0x80, 0x49, 0x8a, 0x16, 0x68, 0x57, 0x03, 0x54, 0x91, 0xe8, 0x84, 0x18, 0x59, 0xf2, 0x50, 0xf2, + 0x34, 0xd3, 0x8d, 0x20, 0xdb, 0x1c, 0x5b, 0x90, 0x6c, 0xb9, 0x22, 0x3d, 0xad, 0xe7, 0x2b, 0xda, + 0xf9, 0x8e, 0x16, 0xe8, 0x13, 0xe8, 0xa2, 0x1f, 0xd0, 0xd7, 0x87, 0xf4, 0x03, 0xfa, 0x5a, 0x16, + 0x94, 0xac, 0xd8, 0x28, 0xa6, 0x3b, 0xde, 0x73, 0x78, 0x0f, 0xef, 0xb9, 0xe4, 0x25, 0x34, 0x86, + 0xc9, 0x74, 0x9a, 0xcc, 0x2e, 0xe6, 0x69, 0x22, 0x13, 0xbc, 0x3f, 0x0d, 0xe3, 0xe7, 0x0b, 0x91, + 0x47, 0x17, 0x39, 0xd5, 0xae, 0xc1, 0x16, 0x99, 0xce, 0xe5, 0xb2, 0xed, 0xc3, 0xb6, 0x2b, 0x03, + 0xb9, 0x10, 0xf8, 0x21, 0x00, 0x4f, 0xd3, 0x24, 0xf5, 0x87, 0xc9, 0x88, 0xb7, 0x4a, 0xa7, 0xa5, + 0xb3, 0xe6, 0x3b, 0x6f, 0x5d, 0xbc, 0x22, 0xf9, 0x82, 0xa8, 0x6d, 0x46, 0x32, 0xe2, 0x4c, 0xe3, + 0xc5, 0x12, 0x1f, 0xc1, 0x76, 0xca, 0x03, 0x91, 0xcc, 0x5a, 0xe5, 0xd3, 0xd2, 0x99, 0xc6, 0x56, + 0x51, 0xfb, 0x3d, 0x68, 0x3c, 0xe2, 0xcb, 0x27, 0x41, 0xbc, 0xe0, 0xbd, 0x20, 0x4c, 0x31, 0x82, + 0x4a, 0xc4, 0x97, 0x99, 0xbe, 0xc6, 0xd4, 0x12, 0x1f, 0xc0, 0xd6, 0x73, 0x45, 0xaf, 0x12, 0xf3, + 0xa0, 0x7d, 0x02, 0xd5, 0xab, 0x38, 0x19, 0xac, 0x59, 0x95, 0xd1, 0x28, 0xd8, 0x07, 0x50, 0xd3, + 0x47, 0xa3, 0x94, 0x0b, 0x81, 0x9b, 0x50, 0x0e, 0xe7, 0x2b, 0xbd, 0x72, 0x38, 0xc7, 0x18, 0xaa, + 0xf3, 0x24, 0x95, 0x99, 0x5a, 0x85, 0x65, 0xeb, 0xf6, 0xcb, 0x12, 0xd4, 0xba, 0x62, 0x7c, 0x15, + 0x08, 0x8e, 0xdf, 0x87, 0x9d, 0xa9, 0x18, 0xfb, 0x72, 0x39, 0x2f, 0x5c, 0x9e, 0xbc, 0xd2, 0x65, + 0x57, 0x8c, 0xbd, 0xe5, 0x9c, 0xb3, 0xda, 0x34, 0x5f, 0xa8, 0x4a, 0xa6, 0x62, 0x4c, 0xcd, 0x95, + 0x72, 0x1e, 0xe0, 0x13, 0xd0, 0x64, 0x38, 0xe5, 0x42, 0x06, 0xd3, 0x79, 0xab, 0x72, 0x5a, 0x3a, + 0xab, 0xb2, 0x35, 0x80, 0x5f, 0x87, 0x1d, 0x91, 0x2c, 0xd2, 0x21, 0xa7, 0x66, 0xab, 0x9a, 0xa5, + 0xdd, 0xc5, 0xed, 0x87, 0xa0, 0x75, 0xc5, 0xf8, 0x86, 0x07, 0x23, 0x9e, 0xe2, 0xb7, 0xa1, 0x3a, + 0x08, 0x44, 0x5e, 0x51, 0xfd, 0xff, 0x2b, 0x52, 0x0e, 0x58, 0xb6, 0xf3, 0xfc, 0x87, 0x2a, 0x68, + 0x77, 0x37, 0x81, 0xeb, 0x50, 0x73, 0xfb, 0x86, 0x41, 0x5c, 0x17, 0xdd, 0xc3, 0x07, 0x80, 0xfa, + 0x36, 0xb9, 0xed, 0x11, 0xc3, 0x23, 0xa6, 0x4f, 0x18, 0x73, 0x18, 0x2a, 0x61, 0x0c, 0x4d, 0xc3, + 0xb1, 0x6d, 0x62, 0x78, 0x7e, 0x47, 0xa7, 0x16, 0x31, 0x51, 0x19, 0x1f, 0xc2, 0x5e, 0x8f, 0xb0, + 0x2e, 0x75, 0x5d, 0xea, 0xd8, 0xbe, 0x49, 0x6c, 0x4a, 0x4c, 0x54, 0xc1, 0xaf, 0xc1, 0xa1, 0xe1, + 0x58, 0x16, 0x31, 0x3c, 0x05, 0xdb, 0x8e, 0xe7, 0x93, 0x5b, 0xea, 0x7a, 0x2e, 0xaa, 0x2a, 0x6d, + 0x6a, 0x59, 0xe4, 0x5a, 0xb7, 0x7c, 0x9d, 0x5d, 0xf7, 0xbb, 0xc4, 0xf6, 0xd0, 0x96, 0xd2, 0x29, + 0x50, 0x93, 0x76, 0x89, 0xad, 0xe4, 0x50, 0x0d, 0x1f, 0x01, 0x2e, 0x60, 0x6a, 0x9b, 0xe4, 0xd6, + 0xf7, 0x9e, 0xf6, 0x08, 0xda, 0xc1, 0x6f, 0xc0, 0x71, 0x81, 0x6f, 0x9e, 0xa3, 0x77, 0x09, 0xd2, + 0x30, 0x82, 0x46, 0x41, 0x7a, 0x4e, 0xef, 0x11, 0x82, 0x4d, 0x75, 0xe6, 0x7c, 0xc8, 0x88, 0xe1, + 0x30, 0x13, 0xd5, 0x37, 0xe1, 0x27, 0xc4, 0xf0, 0x1c, 0xe6, 0x53, 0x13, 0x35, 0x54, 0xf1, 0x05, + 0xec, 0x12, 0x9d, 0x19, 0x37, 0x3e, 0x23, 0x6e, 0xdf, 0xf2, 0xd0, 0x7d, 0xd5, 0x82, 0x0e, 0xb5, + 0x48, 0xe6, 0xa8, 0xe3, 0xf4, 0x6d, 0x13, 0x35, 0xf1, 0x2e, 0xd4, 0xbb, 0xc4, 0xd3, 0x8b, 0x9e, + 0xec, 0xaa, 0xf3, 0x0d, 0xdd, 0xb8, 0x21, 0x05, 0x82, 0x70, 0x0b, 0x0e, 0x0c, 0xdd, 0x56, 0x49, + 0x06, 0x23, 0xba, 0x47, 0xfc, 0x8e, 0x63, 0x99, 0x84, 0xa1, 0x3d, 0x65, 0xf0, 0x3f, 0x0c, 0xb5, + 0x08, 0xc2, 0x1b, 0x19, 0x26, 0xb1, 0xc8, 0x3a, 0x63, 0x7f, 0x23, 0xa3, 0x60, 0x54, 0xc6, 0x81, + 0x32, 0x73, 0xd5, 0xa7, 0x96, 0xb9, 0x6a, 0x54, 0x7e, 0x69, 0x87, 0x78, 0x0f, 0xee, 0x17, 0x66, + 0x6c, 0x8b, 0xba, 0x1e, 0x3a, 0xc2, 0xc7, 0xb0, 0x5f, 0x40, 0x5d, 0xe2, 0x31, 0x6a, 0xe4, 0x5d, + 0x3d, 0x56, 0x7b, 0x9d, 0xbe, 0xe7, 0x3b, 0x1d, 0xbf, 0x4b, 0xba, 0x0e, 0x7b, 0x8a, 0x5a, 0xf8, + 0x00, 0x76, 0x4d, 0xd3, 0x67, 0xe4, 0x71, 0x9f, 0xb8, 0x9e, 0xcf, 0x74, 0x83, 0xa0, 0xdf, 0x6b, + 0xe7, 0x36, 0x00, 0x9d, 0x8d, 0xf8, 0xa7, 0x6a, 0xf2, 0x39, 0xde, 0x81, 0xaa, 0xed, 0xd8, 0x04, + 0xdd, 0xc3, 0x0d, 0xd8, 0xe9, 0xdb, 0xd4, 0x75, 0xfb, 0xc4, 0x44, 0x25, 0xdc, 0x04, 0xa0, 0x76, + 0x8f, 0x39, 0xd7, 0x4c, 0xbd, 0xaa, 0xb2, 0x62, 0x3b, 0xd4, 0xa6, 0xee, 0x4d, 0xf6, 0x44, 0x00, + 0xb6, 0x57, 0xfd, 0xa9, 0x9e, 0xff, 0x58, 0xcd, 0xc6, 0x2b, 0x9b, 0x12, 0x0d, 0xb6, 0x22, 0x3b, + 0x99, 0x71, 0x74, 0x4f, 0x59, 0x8a, 0x8c, 0x94, 0x07, 0x92, 0x1b, 0x49, 0x1c, 0xf3, 0xa1, 0x0c, + 0x93, 0x19, 0x1a, 0xe1, 0x7d, 0xd8, 0x8d, 0xcc, 0x34, 0x99, 0x6f, 0x80, 0x5c, 0xdd, 0x4c, 0x74, + 0x13, 0x88, 0x0d, 0xec, 0x99, 0x32, 0x1a, 0x99, 0x5c, 0x0c, 0xd3, 0x70, 0xb0, 0xa9, 0x30, 0x56, + 0x6f, 0x30, 0x72, 0x27, 0xc9, 0x27, 0x6b, 0x50, 0xa0, 0x49, 0x26, 0x71, 0xcd, 0xa5, 0xbb, 0x14, + 0x46, 0x32, 0x7b, 0x16, 0x8e, 0x05, 0x0a, 0xf1, 0x21, 0xa0, 0x55, 0x09, 0xbd, 0x20, 0x95, 0x61, + 0x96, 0xff, 0x53, 0x09, 0xef, 0x43, 0x33, 0x2b, 0x61, 0x0d, 0xfe, 0xac, 0xe6, 0xe3, 0xbe, 0x2a, + 0x61, 0x8d, 0xfd, 0x52, 0xc2, 0xc7, 0x80, 0xef, 0x4a, 0x58, 0x13, 0xbf, 0x96, 0x54, 0x63, 0xb3, + 0x12, 0xee, 0x40, 0x81, 0x7e, 0x2b, 0xe1, 0x3d, 0x68, 0xac, 0x8e, 0xcb, 0xfa, 0x8b, 0xbe, 0x28, + 0xe7, 0x47, 0xad, 0x14, 0x72, 0xf0, 0x4b, 0xd5, 0xca, 0x5a, 0x44, 0x67, 0x82, 0xa7, 0x12, 0x7d, + 0x56, 0xc9, 0x22, 0x93, 0xc7, 0x5c, 0x72, 0xf4, 0x79, 0x05, 0xd7, 0x61, 0x3b, 0xea, 0xc4, 0x0b, + 0x31, 0x41, 0x2f, 0x73, 0xca, 0xe5, 0x41, 0x3a, 0x9c, 0xa0, 0x3f, 0x2a, 0x59, 0x85, 0x79, 0xc4, + 0xb8, 0x58, 0xc4, 0x12, 0xfd, 0x59, 0xc9, 0xf4, 0xaf, 0xb9, 0x5c, 0xdf, 0x27, 0xfa, 0xab, 0x82, + 0xdf, 0x84, 0x96, 0x02, 0xd7, 0xfd, 0x51, 0x4c, 0x28, 0x64, 0x38, 0x14, 0xe8, 0xef, 0x0a, 0x3e, + 0x81, 0x63, 0x45, 0xdf, 0xd5, 0xbe, 0xc1, 0xfe, 0x53, 0xc1, 0x4d, 0xd0, 0x22, 0x2f, 0x9c, 0x72, + 0x2f, 0x1c, 0x46, 0xe8, 0x2b, 0x2d, 0xb3, 0xfa, 0x78, 0xc1, 0xd3, 0xa5, 0x9d, 0x8c, 0xb8, 0xda, + 0x2a, 0xd0, 0xd7, 0x1a, 0xde, 0x05, 0x88, 0xac, 0x24, 0x18, 0xe5, 0x9e, 0xbe, 0xc9, 0x01, 0xc6, + 0x3f, 0x5e, 0x70, 0x21, 0xa9, 0x89, 0xbe, 0x55, 0x73, 0x5c, 0x2f, 0x00, 0xcf, 0x75, 0xd0, 0x77, + 0x5a, 0x76, 0x1b, 0x7a, 0x1c, 0x27, 0xc3, 0x40, 0x72, 0x97, 0x8f, 0xa7, 0x7c, 0x26, 0xd1, 0xf7, + 0xda, 0xd5, 0xd5, 0x47, 0x1f, 0x8c, 0x43, 0x39, 0x59, 0x0c, 0xd4, 0x47, 0x77, 0xf9, 0x22, 0x8c, + 0xe3, 0xf0, 0x85, 0xe4, 0xc3, 0xc9, 0x65, 0xfe, 0x09, 0x3e, 0x18, 0x85, 0x42, 0xa6, 0xe1, 0x60, + 0x21, 0xf9, 0xe8, 0x32, 0x9c, 0x49, 0x9e, 0xce, 0x82, 0xf8, 0x32, 0xfb, 0x19, 0x2f, 0xf3, 0x9f, + 0x71, 0x3e, 0x18, 0x6c, 0x67, 0xf1, 0xbb, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0x8e, 0xe1, 0x76, + 0x9b, 0xfc, 0x06, 0x00, 0x00, } diff --git a/internal/proto/data_service.proto b/internal/proto/data_service.proto index a3482c37d9..b6caa190e5 100644 --- a/internal/proto/data_service.proto +++ b/internal/proto/data_service.proto @@ -20,7 +20,7 @@ message RegisterNodeResponse { message SegIDRequest { uint32 count = 1; - string channelID = 2; + string channelName = 2; int64 collectionID = 3; int64 partitionID = 4; string coll_name = 5;// todo remove @@ -30,13 +30,14 @@ message SegIDRequest { message AssignSegIDRequest { int64 nodeID = 1; string peer_role = 2; - repeated SegIDRequest segIDRequests = 3; + uint64 timestamp = 3; + repeated SegIDRequest segIDRequests = 4; } message SegIDAssignment { int64 segID = 1; - int32 channelID = 2; + string channelName = 2; uint32 count = 3; int64 collectionID = 4; int64 partitionID = 5; @@ -48,6 +49,7 @@ message SegIDAssignment { message AssignSegIDResponse { repeated SegIDAssignment segIDAssignments = 1; + common.Status status = 2; } message FlushRequest { @@ -107,7 +109,7 @@ message InsertChannelRequest { message WatchDmChannelRequest { common.MsgBase base = 1; - repeated string channelIDs = 2; + repeated string channelNames = 2; } diff --git a/internal/proto/datapb/data_service.pb.go b/internal/proto/datapb/data_service.pb.go index 14d7dd4ab5..39f9f81ab3 100644 --- a/internal/proto/datapb/data_service.pb.go +++ b/internal/proto/datapb/data_service.pb.go @@ -145,7 +145,7 @@ func (m *RegisterNodeResponse) GetInitParams() *internalpb2.InitParams { type SegIDRequest struct { Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` - ChannelID string `protobuf:"bytes,2,opt,name=channelID,proto3" json:"channelID,omitempty"` + ChannelName string `protobuf:"bytes,2,opt,name=channelName,proto3" json:"channelName,omitempty"` CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"` CollName string `protobuf:"bytes,5,opt,name=coll_name,json=collName,proto3" json:"coll_name,omitempty"` @@ -187,9 +187,9 @@ func (m *SegIDRequest) GetCount() uint32 { return 0 } -func (m *SegIDRequest) GetChannelID() string { +func (m *SegIDRequest) GetChannelName() string { if m != nil { - return m.ChannelID + return m.ChannelName } return "" } @@ -225,7 +225,8 @@ func (m *SegIDRequest) GetPartitionName() string { type AssignSegIDRequest struct { NodeID int64 `protobuf:"varint,1,opt,name=nodeID,proto3" json:"nodeID,omitempty"` PeerRole string `protobuf:"bytes,2,opt,name=peer_role,json=peerRole,proto3" json:"peer_role,omitempty"` - SegIDRequests []*SegIDRequest `protobuf:"bytes,3,rep,name=segIDRequests,proto3" json:"segIDRequests,omitempty"` + Timestamp uint64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + SegIDRequests []*SegIDRequest `protobuf:"bytes,4,rep,name=segIDRequests,proto3" json:"segIDRequests,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -270,6 +271,13 @@ func (m *AssignSegIDRequest) GetPeerRole() string { return "" } +func (m *AssignSegIDRequest) GetTimestamp() uint64 { + if m != nil { + return m.Timestamp + } + return 0 +} + func (m *AssignSegIDRequest) GetSegIDRequests() []*SegIDRequest { if m != nil { return m.SegIDRequests @@ -279,7 +287,7 @@ func (m *AssignSegIDRequest) GetSegIDRequests() []*SegIDRequest { type SegIDAssignment struct { SegID int64 `protobuf:"varint,1,opt,name=segID,proto3" json:"segID,omitempty"` - ChannelID int32 `protobuf:"varint,2,opt,name=channelID,proto3" json:"channelID,omitempty"` + ChannelName string `protobuf:"bytes,2,opt,name=channelName,proto3" json:"channelName,omitempty"` Count uint32 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` CollectionID int64 `protobuf:"varint,4,opt,name=collectionID,proto3" json:"collectionID,omitempty"` PartitionID int64 `protobuf:"varint,5,opt,name=partitionID,proto3" json:"partitionID,omitempty"` @@ -324,11 +332,11 @@ func (m *SegIDAssignment) GetSegID() int64 { return 0 } -func (m *SegIDAssignment) GetChannelID() int32 { +func (m *SegIDAssignment) GetChannelName() string { if m != nil { - return m.ChannelID + return m.ChannelName } - return 0 + return "" } func (m *SegIDAssignment) GetCount() uint32 { @@ -382,6 +390,7 @@ func (m *SegIDAssignment) GetPartitionName() string { type AssignSegIDResponse struct { SegIDAssignments []*SegIDAssignment `protobuf:"bytes,1,rep,name=segIDAssignments,proto3" json:"segIDAssignments,omitempty"` + Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -419,6 +428,13 @@ func (m *AssignSegIDResponse) GetSegIDAssignments() []*SegIDAssignment { return nil } +func (m *AssignSegIDResponse) GetStatus() *commonpb.Status { + if m != nil { + return m.Status + } + return nil +} + type FlushRequest struct { Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"` @@ -837,7 +853,7 @@ func (m *InsertChannelRequest) GetCollectionID() int64 { type WatchDmChannelRequest struct { Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` - ChannelIDs []string `protobuf:"bytes,2,rep,name=channelIDs,proto3" json:"channelIDs,omitempty"` + ChannelNames []string `protobuf:"bytes,2,rep,name=channelNames,proto3" json:"channelNames,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -875,9 +891,9 @@ func (m *WatchDmChannelRequest) GetBase() *commonpb.MsgBase { return nil } -func (m *WatchDmChannelRequest) GetChannelIDs() []string { +func (m *WatchDmChannelRequest) GetChannelNames() []string { if m != nil { - return m.ChannelIDs + return m.ChannelNames } return nil } @@ -1064,78 +1080,79 @@ func init() { func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) } var fileDescriptor_3385cd32ad6cfe64 = []byte{ - // 1127 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x56, 0xcf, 0x6e, 0xdb, 0x46, - 0x13, 0xb7, 0x44, 0xc9, 0x96, 0x46, 0xb2, 0xa4, 0xac, 0x9d, 0xef, 0x53, 0x94, 0x34, 0x76, 0x09, - 0x24, 0x36, 0x82, 0xd6, 0x2e, 0x1c, 0xa4, 0xbd, 0x15, 0x8d, 0xab, 0xd4, 0x10, 0xda, 0x18, 0xc6, - 0xaa, 0x40, 0x81, 0xf4, 0x20, 0x50, 0xe2, 0x98, 0x5a, 0x80, 0xdc, 0x65, 0xb9, 0xab, 0xd8, 0xf0, - 0xa5, 0x7d, 0x80, 0x1c, 0x7a, 0xef, 0xbd, 0x2f, 0x51, 0xa0, 0x4f, 0xd0, 0x87, 0x2a, 0xb8, 0x4b, - 0xd1, 0xa4, 0x44, 0x47, 0x86, 0xdb, 0xf4, 0xc6, 0x1d, 0xfe, 0x76, 0xfe, 0xfe, 0x66, 0x66, 0x81, - 0xb8, 0x8e, 0x72, 0x46, 0x12, 0xa3, 0xb7, 0x6c, 0x82, 0x07, 0x61, 0x24, 0x94, 0x20, 0xf7, 0x02, - 0xe6, 0xbf, 0x9d, 0x49, 0x73, 0x3a, 0x88, 0x01, 0xbd, 0xe6, 0x44, 0x04, 0x81, 0xe0, 0x46, 0xd4, - 0x6b, 0x31, 0xae, 0x30, 0xe2, 0x8e, 0x6f, 0xce, 0xf6, 0xcf, 0xb0, 0x45, 0xd1, 0x63, 0x52, 0x61, - 0x74, 0x2a, 0x5c, 0xa4, 0xf8, 0xd3, 0x0c, 0xa5, 0x22, 0x9f, 0x41, 0x65, 0xec, 0x48, 0xec, 0x96, - 0x76, 0x4b, 0xfb, 0x8d, 0xa3, 0x47, 0x07, 0x39, 0xb5, 0x89, 0xc2, 0xd7, 0xd2, 0x3b, 0x76, 0x24, - 0x52, 0x8d, 0x24, 0x9f, 0xc3, 0x86, 0xe3, 0xba, 0x11, 0x4a, 0xd9, 0x2d, 0xbf, 0xe7, 0xd2, 0x4b, - 0x83, 0xa1, 0x73, 0xb0, 0xfd, 0x06, 0xb6, 0xf3, 0x0e, 0xc8, 0x50, 0x70, 0x89, 0xe4, 0x18, 0x1a, - 0x8c, 0x33, 0x35, 0x0a, 0x9d, 0xc8, 0x09, 0x64, 0xe2, 0xc8, 0xc7, 0x79, 0x9d, 0x69, 0x2c, 0x03, - 0xce, 0xd4, 0x99, 0x06, 0x52, 0x60, 0xe9, 0xb7, 0xfd, 0x57, 0x09, 0x9a, 0x43, 0xf4, 0x06, 0xfd, - 0x79, 0x58, 0xdb, 0x50, 0x9d, 0x88, 0x19, 0x57, 0x5a, 0xdd, 0x26, 0x35, 0x07, 0xf2, 0x08, 0xea, - 0x93, 0xa9, 0xc3, 0x39, 0xfa, 0x83, 0xbe, 0x76, 0xbe, 0x4e, 0xaf, 0x05, 0xc4, 0x86, 0xe6, 0x44, - 0xf8, 0x3e, 0x4e, 0x14, 0x13, 0x7c, 0xd0, 0xef, 0x5a, 0xbb, 0xa5, 0x7d, 0x8b, 0xe6, 0x64, 0x64, - 0x17, 0x1a, 0xa1, 0x13, 0x29, 0x96, 0x40, 0x2a, 0x1a, 0x92, 0x15, 0x91, 0x87, 0x50, 0x8f, 0x6f, - 0x8c, 0xb8, 0x13, 0x60, 0xb7, 0xaa, 0x6d, 0xd4, 0x62, 0xc1, 0xa9, 0x13, 0x20, 0x79, 0x02, 0xad, - 0x14, 0x6b, 0x10, 0xeb, 0x1a, 0xb1, 0x99, 0x4a, 0x63, 0x98, 0xfd, 0x6b, 0x09, 0xc8, 0x4b, 0x29, - 0x99, 0xc7, 0x73, 0x41, 0xfd, 0x0f, 0xd6, 0xb9, 0x70, 0x71, 0xd0, 0xd7, 0x51, 0x59, 0x34, 0x39, - 0xc5, 0x26, 0x43, 0xc4, 0x68, 0x14, 0x09, 0x1f, 0x93, 0xb0, 0x6a, 0xb1, 0x80, 0x0a, 0x1f, 0xc9, - 0x2b, 0xd8, 0x94, 0x19, 0x25, 0xb2, 0x6b, 0xed, 0x5a, 0xfb, 0x8d, 0xa3, 0x9d, 0x83, 0x25, 0x02, - 0x1d, 0x64, 0x8d, 0xd1, 0xfc, 0x2d, 0xfb, 0x8f, 0x32, 0xb4, 0xf5, 0x7f, 0xe3, 0x57, 0x80, 0x5c, - 0x27, 0x59, 0x83, 0x12, 0x77, 0xcc, 0x61, 0x39, 0xc9, 0xd5, 0x6c, 0x92, 0xd3, 0xc2, 0x58, 0xd9, - 0xc2, 0x2c, 0xa6, 0xbe, 0xb2, 0x3a, 0xf5, 0xd5, 0xe5, 0xd4, 0xef, 0x40, 0x03, 0x2f, 0x43, 0x16, - 0xe1, 0x48, 0xb1, 0x24, 0xb5, 0x15, 0x0a, 0x46, 0xf4, 0x3d, 0x0b, 0x90, 0x3c, 0x87, 0x75, 0xa9, - 0x1c, 0x35, 0x93, 0xdd, 0x0d, 0xcd, 0xb2, 0x87, 0x85, 0xcc, 0x1d, 0x6a, 0x08, 0x4d, 0xa0, 0xf9, - 0x82, 0xd6, 0x56, 0x16, 0xb4, 0x5e, 0x54, 0x50, 0x84, 0xad, 0x5c, 0x3d, 0x13, 0xea, 0x9f, 0x42, - 0x47, 0xe6, 0x73, 0x1a, 0xf3, 0x3f, 0x2e, 0x8f, 0x7d, 0x53, 0x79, 0xae, 0xa1, 0x74, 0xe9, 0xae, - 0x7d, 0x09, 0xcd, 0x6f, 0xfc, 0x99, 0x9c, 0xde, 0xbd, 0xb9, 0x09, 0x54, 0xdc, 0x71, 0x52, 0x37, - 0x8b, 0xea, 0xef, 0xdb, 0x14, 0xc7, 0x7e, 0x57, 0x02, 0x32, 0x9c, 0x8a, 0x8b, 0x21, 0x7a, 0xda, - 0xb7, 0x3b, 0x3b, 0xb0, 0x68, 0xac, 0xbc, 0x9a, 0x09, 0xd6, 0x12, 0x13, 0xec, 0x17, 0xb0, 0x95, - 0xf3, 0x26, 0xc9, 0xf7, 0x63, 0x00, 0x69, 0x44, 0x83, 0xbe, 0xc9, 0xb4, 0x45, 0x33, 0x12, 0xfb, - 0x1c, 0xb6, 0x93, 0x2b, 0x31, 0x07, 0x50, 0xde, 0x3d, 0x8c, 0x47, 0x50, 0x4f, 0xf5, 0x26, 0x31, - 0x5c, 0x0b, 0xec, 0xdf, 0xcb, 0x70, 0x7f, 0xc1, 0x50, 0xe2, 0xe1, 0x0b, 0xa8, 0xc6, 0xb4, 0x33, - 0xa6, 0x5a, 0x37, 0x75, 0x69, 0x7a, 0x91, 0x1a, 0x74, 0xcc, 0xfc, 0x49, 0x84, 0x8e, 0x4a, 0x98, - 0x5f, 0x36, 0xcc, 0x37, 0x22, 0xcd, 0xfc, 0x1d, 0x68, 0x48, 0x74, 0x7c, 0x74, 0x0d, 0xc0, 0x32, - 0x00, 0x23, 0xd2, 0x80, 0x6f, 0xa1, 0x2d, 0x95, 0x13, 0xa9, 0x51, 0x28, 0xa4, 0xce, 0xa2, 0xec, - 0x56, 0x8a, 0x98, 0x98, 0x4e, 0xe2, 0xd7, 0xd2, 0x3b, 0x4b, 0xa0, 0xb4, 0xa5, 0xaf, 0xce, 0x8f, - 0x92, 0x9c, 0xc0, 0x26, 0x72, 0x37, 0xa3, 0xaa, 0x7a, 0x6b, 0x55, 0x4d, 0xe4, 0x6e, 0xaa, 0xc8, - 0x66, 0xf0, 0xff, 0x01, 0x97, 0x18, 0xa9, 0x63, 0xc6, 0x7d, 0xe1, 0x9d, 0x39, 0x6a, 0xfa, 0xa1, - 0x6a, 0x12, 0xc2, 0x83, 0x45, 0x53, 0xd7, 0x65, 0xe9, 0x41, 0xed, 0x9c, 0xa1, 0xef, 0x5e, 0xd3, - 0x26, 0x3d, 0x93, 0x2f, 0xa0, 0x1a, 0xc6, 0xe0, 0x6e, 0x59, 0x07, 0x79, 0xd3, 0xe6, 0x1a, 0xaa, - 0x88, 0x71, 0xef, 0x3b, 0x26, 0x15, 0x35, 0x78, 0xfb, 0x97, 0x12, 0x6c, 0x1b, 0x93, 0x5f, 0x9b, - 0xf1, 0xf8, 0x61, 0xdb, 0xb6, 0x60, 0x9d, 0xd9, 0x0c, 0xee, 0xff, 0xe0, 0xa8, 0xc9, 0xb4, 0x1f, - 0xfc, 0x63, 0x17, 0x1e, 0x03, 0xa4, 0x53, 0xde, 0xe4, 0xa2, 0x4e, 0x33, 0x12, 0xfb, 0xb7, 0x12, - 0xb4, 0xf5, 0x70, 0x1a, 0xa2, 0xf7, 0x9f, 0x07, 0xba, 0xd0, 0xf9, 0x95, 0xa5, 0xce, 0x7f, 0x57, - 0x86, 0x46, 0xd2, 0x58, 0x03, 0x7e, 0x2e, 0xf2, 0x5c, 0x29, 0x2d, 0x70, 0xe5, 0xdf, 0x19, 0x52, - 0x64, 0x0f, 0xda, 0x4c, 0x97, 0x7f, 0x94, 0xa4, 0xc9, 0x38, 0x56, 0xa7, 0x2d, 0x96, 0x65, 0x85, - 0xde, 0x40, 0x22, 0x44, 0x6e, 0x5a, 0xb7, 0xaa, 0x5b, 0xb7, 0x16, 0x0b, 0x74, 0xe3, 0x7e, 0x04, - 0x30, 0xf1, 0x85, 0xcc, 0xed, 0xbc, 0xba, 0x96, 0xe8, 0xdf, 0x0f, 0xa0, 0xc6, 0x67, 0xc1, 0x28, - 0x12, 0x17, 0x66, 0xe9, 0x59, 0x74, 0x83, 0xcf, 0x02, 0x2a, 0x2e, 0x64, 0xfc, 0x2b, 0xc0, 0x60, - 0x24, 0xd9, 0x95, 0xd9, 0x6b, 0x16, 0xdd, 0x08, 0x30, 0x18, 0xb2, 0x2b, 0x7c, 0xf6, 0xa3, 0x7e, - 0x4e, 0xa5, 0x63, 0x86, 0xb4, 0xd3, 0xec, 0x9c, 0x0a, 0x8e, 0x9d, 0x35, 0xb2, 0xa5, 0x5f, 0x03, - 0x46, 0xa0, 0x5e, 0x5d, 0x32, 0xa9, 0x3a, 0x25, 0x42, 0xa0, 0x95, 0x08, 0x4f, 0x22, 0x71, 0xc1, - 0xb8, 0xd7, 0x29, 0x93, 0x7b, 0xb0, 0x39, 0xd7, 0xa4, 0x87, 0x4d, 0xc7, 0x3a, 0xfa, 0xb3, 0x0a, - 0x8d, 0xbe, 0xa3, 0x9c, 0xa1, 0x79, 0xd0, 0x12, 0x07, 0x9a, 0xd9, 0x87, 0x21, 0x79, 0x5a, 0x30, - 0xf4, 0x0a, 0x9e, 0xae, 0xbd, 0xbd, 0x95, 0x38, 0xd3, 0xbd, 0xf6, 0x1a, 0x39, 0x81, 0xaa, 0xe6, - 0x1e, 0x29, 0x1a, 0xa8, 0xd9, 0x95, 0xd9, 0x7b, 0xdf, 0x93, 0xc0, 0x5e, 0x23, 0x63, 0x68, 0xa7, - 0x8b, 0x3c, 0x21, 0xc3, 0x93, 0x02, 0x95, 0xcb, 0x8f, 0xb7, 0xde, 0xd3, 0x55, 0xb0, 0xd4, 0xd9, - 0x11, 0x34, 0x33, 0xcb, 0x4b, 0x16, 0x1a, 0x58, 0xde, 0xb5, 0x85, 0x06, 0x0a, 0x96, 0xa0, 0xbd, - 0x46, 0x3c, 0xe8, 0x9c, 0xa0, 0xca, 0x2d, 0x20, 0xb2, 0xb7, 0x62, 0xd3, 0xcc, 0x77, 0x61, 0x6f, - 0x7f, 0x35, 0x30, 0x35, 0x14, 0xc1, 0xf6, 0x09, 0xaa, 0xa5, 0xb1, 0x4a, 0x9e, 0x15, 0xe8, 0xb8, - 0x61, 0xce, 0xf7, 0x3e, 0xb9, 0x05, 0x36, 0x6b, 0xd3, 0x81, 0x7b, 0xa9, 0xcd, 0xb4, 0x83, 0xf6, - 0x6e, 0x54, 0x92, 0x9f, 0x7b, 0xbd, 0xd5, 0xd3, 0xdb, 0x5e, 0x3b, 0xfe, 0xea, 0xcd, 0x97, 0x1e, - 0x53, 0xd3, 0xd9, 0x38, 0xa6, 0xc7, 0xe1, 0x15, 0xf3, 0x7d, 0x76, 0xa5, 0x70, 0x32, 0x3d, 0x34, - 0x77, 0x3f, 0x75, 0x99, 0x54, 0x11, 0x1b, 0xcf, 0x14, 0xba, 0x87, 0x73, 0x0d, 0x87, 0x5a, 0xe1, - 0x61, 0x6c, 0x39, 0x1c, 0x8f, 0xd7, 0xf5, 0xe9, 0xf9, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x0e, - 0x0b, 0x09, 0xf1, 0xda, 0x0d, 0x00, 0x00, + // 1144 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x56, 0xdd, 0x6e, 0x1b, 0xc5, + 0x17, 0x8f, 0xbf, 0x12, 0xfb, 0xd8, 0xb1, 0xdd, 0x49, 0xfa, 0xff, 0xbb, 0x6e, 0x21, 0x61, 0xa5, + 0x36, 0x51, 0x05, 0x09, 0x4a, 0x55, 0xb8, 0x43, 0x34, 0xb8, 0x44, 0x16, 0x34, 0x8a, 0xc6, 0x48, + 0x48, 0xe5, 0xc2, 0x5a, 0x7b, 0x4f, 0xd6, 0x23, 0xed, 0xce, 0x2c, 0x3b, 0xe3, 0x26, 0xca, 0x0d, + 0x3c, 0x40, 0x9f, 0x00, 0xee, 0xb9, 0xe4, 0x11, 0xe0, 0x15, 0x78, 0x24, 0xb4, 0x33, 0xeb, 0xf5, + 0xae, 0xbd, 0xae, 0xa3, 0x40, 0xb9, 0xdb, 0x39, 0xfb, 0x9b, 0xf3, 0xf5, 0x3b, 0x1f, 0x03, 0xc4, + 0xb1, 0x95, 0x3d, 0x94, 0x18, 0xbe, 0x61, 0x63, 0x3c, 0x0a, 0x42, 0xa1, 0x04, 0xb9, 0xe7, 0x33, + 0xef, 0xcd, 0x54, 0x9a, 0xd3, 0x51, 0x04, 0xe8, 0x36, 0xc6, 0xc2, 0xf7, 0x05, 0x37, 0xa2, 0x6e, + 0x93, 0x71, 0x85, 0x21, 0xb7, 0x3d, 0x73, 0xb6, 0x7e, 0x82, 0x1d, 0x8a, 0x2e, 0x93, 0x0a, 0xc3, + 0x73, 0xe1, 0x20, 0xc5, 0x1f, 0xa7, 0x28, 0x15, 0xf9, 0x14, 0xca, 0x23, 0x5b, 0x62, 0xa7, 0xb0, + 0x5f, 0x38, 0xac, 0x9f, 0x3c, 0x3a, 0xca, 0xa8, 0x8d, 0x15, 0xbe, 0x92, 0xee, 0xa9, 0x2d, 0x91, + 0x6a, 0x24, 0xf9, 0x0c, 0xb6, 0x6c, 0xc7, 0x09, 0x51, 0xca, 0x4e, 0xf1, 0x1d, 0x97, 0x5e, 0x18, + 0x0c, 0x9d, 0x81, 0xad, 0xd7, 0xb0, 0x9b, 0x75, 0x40, 0x06, 0x82, 0x4b, 0x24, 0xa7, 0x50, 0x67, + 0x9c, 0xa9, 0x61, 0x60, 0x87, 0xb6, 0x2f, 0x63, 0x47, 0x3e, 0xca, 0xea, 0x4c, 0x62, 0xe9, 0x73, + 0xa6, 0x2e, 0x34, 0x90, 0x02, 0x4b, 0xbe, 0xad, 0xbf, 0x0a, 0xd0, 0x18, 0xa0, 0xdb, 0xef, 0xcd, + 0xc2, 0xda, 0x85, 0xca, 0x58, 0x4c, 0xb9, 0xd2, 0xea, 0xb6, 0xa9, 0x39, 0x90, 0x7d, 0xa8, 0x8f, + 0x27, 0x36, 0xe7, 0xe8, 0x9d, 0xdb, 0x3e, 0x6a, 0xf7, 0x6b, 0x34, 0x2d, 0x22, 0x16, 0x34, 0xc6, + 0xc2, 0xf3, 0x70, 0xac, 0x98, 0xe0, 0xfd, 0x5e, 0xa7, 0xb4, 0x5f, 0x38, 0x2c, 0xd1, 0x8c, 0x2c, + 0xd2, 0x12, 0xd8, 0xa1, 0x62, 0x31, 0xa4, 0xac, 0x21, 0x69, 0x11, 0x79, 0x08, 0xb5, 0xe8, 0xc6, + 0x90, 0x47, 0x56, 0x2a, 0xda, 0x4a, 0x35, 0x12, 0x68, 0x13, 0x8f, 0xa1, 0x99, 0x60, 0x0d, 0x62, + 0x53, 0x23, 0xb6, 0x13, 0x69, 0x04, 0xb3, 0x7e, 0x2f, 0x00, 0x79, 0x21, 0x25, 0x73, 0x79, 0x26, + 0xb0, 0xff, 0xc1, 0x26, 0x17, 0x0e, 0xf6, 0x7b, 0x3a, 0xb2, 0x12, 0x8d, 0x4f, 0x91, 0xc9, 0x00, + 0x31, 0x1c, 0x86, 0xc2, 0x9b, 0x05, 0x56, 0x8d, 0x04, 0x54, 0x78, 0x48, 0x1e, 0x41, 0x4d, 0x31, + 0x1f, 0xa5, 0xb2, 0xfd, 0x40, 0x87, 0x54, 0xa6, 0x73, 0x01, 0x79, 0x09, 0xdb, 0x32, 0x65, 0x42, + 0x76, 0xca, 0xfb, 0xa5, 0xc3, 0xfa, 0xc9, 0xde, 0xd1, 0x52, 0x89, 0x1d, 0xa5, 0x5d, 0xa1, 0xd9, + 0x5b, 0xd6, 0x9f, 0x45, 0x68, 0xe9, 0xff, 0xc6, 0x6b, 0x1f, 0xb9, 0xa6, 0x41, 0x83, 0x62, 0x67, + 0xcd, 0xe1, 0x16, 0x34, 0x24, 0xf4, 0x95, 0xd2, 0xf4, 0x2d, 0x92, 0x53, 0x5e, 0x4f, 0x4e, 0x65, + 0x99, 0x9c, 0x3d, 0xa8, 0xe3, 0x75, 0xc0, 0x42, 0x1c, 0x46, 0x29, 0xd0, 0xc9, 0x2f, 0x53, 0x30, + 0xa2, 0xef, 0x98, 0x8f, 0xe4, 0x19, 0x6c, 0x4a, 0x65, 0xab, 0xa9, 0xec, 0x6c, 0xe9, 0x5a, 0x7c, + 0x98, 0x5b, 0xdf, 0x03, 0x0d, 0xa1, 0x31, 0x34, 0x4b, 0x79, 0x75, 0x2d, 0xe5, 0xb5, 0x3c, 0xca, + 0x7f, 0x29, 0xc0, 0x4e, 0x86, 0xf2, 0xb8, 0x43, 0xce, 0xa1, 0x2d, 0xb3, 0x89, 0x8d, 0xda, 0x24, + 0xe2, 0xc8, 0x5a, 0xc5, 0xd1, 0x1c, 0x4a, 0x97, 0xee, 0xa6, 0x02, 0x2c, 0xde, 0x3a, 0x40, 0xeb, + 0x1a, 0x1a, 0x5f, 0x7b, 0x53, 0x39, 0xb9, 0xfb, 0xe0, 0x20, 0x50, 0x76, 0x46, 0xfd, 0x9e, 0x36, + 0x5a, 0xa2, 0xfa, 0xfb, 0x36, 0x94, 0x5a, 0x6f, 0x0b, 0x40, 0x06, 0x13, 0x71, 0x35, 0x40, 0x57, + 0x07, 0x74, 0x67, 0x07, 0x16, 0x8d, 0x15, 0xd7, 0xd7, 0x4f, 0x69, 0xa9, 0x7e, 0xac, 0xe7, 0xb0, + 0x93, 0xf1, 0x26, 0x26, 0xe9, 0x43, 0x00, 0x69, 0x44, 0xfd, 0x9e, 0xa1, 0xa7, 0x44, 0x53, 0x12, + 0xeb, 0x12, 0x76, 0xe3, 0x2b, 0x51, 0x62, 0x51, 0xde, 0x3d, 0x8c, 0x47, 0x50, 0x4b, 0xf4, 0xc6, + 0x31, 0xcc, 0x05, 0xd6, 0x6f, 0x45, 0xb8, 0xbf, 0x60, 0x28, 0xf6, 0xf0, 0x39, 0x54, 0x22, 0x2e, + 0x8d, 0xa9, 0xe6, 0xaa, 0xfe, 0x4e, 0x2e, 0x52, 0x83, 0x8e, 0xfa, 0x65, 0x1c, 0xa2, 0xad, 0xe2, + 0x7e, 0x29, 0x9a, 0x7e, 0x31, 0x22, 0xdd, 0x2f, 0x7b, 0x50, 0x97, 0x68, 0x7b, 0xe8, 0x18, 0x80, + 0x99, 0x2f, 0x60, 0x44, 0x1a, 0xf0, 0x0d, 0xb4, 0xa4, 0xb2, 0x43, 0x35, 0x0c, 0x84, 0xd4, 0x59, + 0x9c, 0x8d, 0x18, 0x6b, 0xc5, 0x94, 0x7f, 0x25, 0xdd, 0x8b, 0x18, 0x4a, 0x9b, 0xfa, 0xea, 0xec, + 0x28, 0xc9, 0x19, 0x6c, 0x23, 0x77, 0x52, 0xaa, 0x2a, 0xb7, 0x56, 0xd5, 0x40, 0xee, 0x24, 0x8a, + 0x2c, 0x06, 0xff, 0xef, 0x73, 0x89, 0xa1, 0x3a, 0x65, 0xdc, 0x13, 0xee, 0x85, 0xad, 0x26, 0xef, + 0x8b, 0x93, 0x00, 0x1e, 0x2c, 0x9a, 0x9a, 0xd3, 0xd2, 0x85, 0xea, 0x25, 0x43, 0xcf, 0x99, 0x97, + 0x4d, 0x72, 0x26, 0x9f, 0x43, 0x25, 0x88, 0xc0, 0x9d, 0xa2, 0x0e, 0x72, 0xd5, 0x56, 0x1c, 0xa8, + 0x90, 0x71, 0xf7, 0x5b, 0x26, 0x15, 0x35, 0x78, 0xeb, 0xe7, 0x02, 0xec, 0x1a, 0x93, 0x5f, 0x99, + 0xb1, 0xfa, 0x7e, 0xdb, 0x36, 0x67, 0x4d, 0x5a, 0x3e, 0xdc, 0xff, 0xde, 0x56, 0xe3, 0x49, 0xcf, + 0xff, 0xc7, 0x2e, 0x44, 0xe6, 0xe6, 0xdb, 0xc1, 0x64, 0xa3, 0x46, 0x33, 0x32, 0xeb, 0xd7, 0x02, + 0xb4, 0xf4, 0x80, 0x1a, 0xa0, 0xfb, 0x9f, 0x07, 0xbb, 0xd0, 0xfd, 0xe5, 0xa5, 0xee, 0x7f, 0x5b, + 0x84, 0x7a, 0xdc, 0x5c, 0x7d, 0x7e, 0x29, 0xb2, 0xf5, 0x52, 0x58, 0xa8, 0x97, 0x7f, 0x67, 0x50, + 0x91, 0x03, 0x68, 0x31, 0x5d, 0x02, 0xc3, 0x38, 0x51, 0xc6, 0xb1, 0x1a, 0x6d, 0xb2, 0x74, 0x65, + 0xe8, 0xdd, 0x25, 0x02, 0xe4, 0xa6, 0x7d, 0x2b, 0xba, 0x7d, 0xab, 0x91, 0x40, 0x37, 0xef, 0x07, + 0x00, 0x63, 0x4f, 0xc8, 0xcc, 0xb6, 0xac, 0x69, 0x89, 0xfe, 0xfd, 0x00, 0xaa, 0x7c, 0xea, 0x0f, + 0x43, 0x71, 0x65, 0xd6, 0x65, 0x89, 0x6e, 0xf1, 0xa9, 0x4f, 0xc5, 0x95, 0x8c, 0x7e, 0xf9, 0xe8, + 0x0f, 0x25, 0xbb, 0x31, 0x1b, 0xb1, 0x44, 0xb7, 0x7c, 0xf4, 0x07, 0xec, 0x06, 0x9f, 0xfe, 0xa0, + 0x9f, 0x6b, 0xc9, 0xa8, 0x21, 0xad, 0x24, 0x3b, 0xe7, 0x82, 0x63, 0x7b, 0x83, 0xec, 0xe8, 0xb7, + 0x84, 0x11, 0xa8, 0x97, 0xd7, 0x4c, 0xaa, 0x76, 0x81, 0x10, 0x68, 0xc6, 0xc2, 0xb3, 0x50, 0x5c, + 0x31, 0xee, 0xb6, 0x8b, 0xe4, 0x1e, 0x6c, 0xcf, 0x34, 0xe9, 0x81, 0xd3, 0x2e, 0x9d, 0xfc, 0x51, + 0x81, 0x7a, 0xcf, 0x56, 0xf6, 0xc0, 0x3c, 0x98, 0x89, 0x0d, 0x8d, 0xf4, 0xc3, 0x93, 0x3c, 0xc9, + 0x19, 0x7c, 0x39, 0x4f, 0xe3, 0xee, 0xc1, 0x5a, 0x9c, 0xe9, 0x60, 0x6b, 0x83, 0x9c, 0x41, 0x45, + 0xd7, 0x1e, 0xc9, 0x1b, 0xaa, 0xe9, 0xb5, 0xd9, 0x7d, 0xd7, 0xae, 0xb5, 0x36, 0xc8, 0x08, 0x5a, + 0xc9, 0x0b, 0x20, 0x2e, 0x86, 0xc7, 0x39, 0x2a, 0x97, 0x1f, 0x86, 0xdd, 0x27, 0xeb, 0x60, 0x89, + 0xb3, 0x43, 0x68, 0xa4, 0x16, 0x98, 0xcc, 0x35, 0xb0, 0xbc, 0x6f, 0x73, 0x0d, 0xe4, 0x2c, 0x42, + 0x6b, 0x83, 0xb8, 0xd0, 0x3e, 0x43, 0x95, 0x59, 0x42, 0xe4, 0x60, 0xcd, 0xb6, 0x99, 0xed, 0xc3, + 0xee, 0xe1, 0x7a, 0x60, 0x62, 0x28, 0x84, 0xdd, 0x33, 0x54, 0x4b, 0xa3, 0x95, 0x3c, 0xcd, 0xd1, + 0xb1, 0x62, 0xd6, 0x77, 0x3f, 0xbe, 0x05, 0x36, 0x6d, 0xd3, 0x86, 0x7b, 0x89, 0xcd, 0xa4, 0x83, + 0x0e, 0x56, 0x2a, 0xc9, 0xce, 0xbe, 0xee, 0xfa, 0x09, 0x6e, 0x6d, 0x9c, 0x7e, 0xf9, 0xfa, 0x0b, + 0x97, 0xa9, 0xc9, 0x74, 0x14, 0x95, 0xc7, 0xf1, 0x0d, 0xf3, 0x3c, 0x76, 0xa3, 0x70, 0x3c, 0x39, + 0x36, 0x77, 0x3f, 0x71, 0x98, 0x54, 0x21, 0x1b, 0x4d, 0x15, 0x3a, 0xc7, 0x33, 0x0d, 0xc7, 0x5a, + 0xe1, 0x71, 0x64, 0x39, 0x18, 0x8d, 0x36, 0xf5, 0xe9, 0xd9, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, + 0x92, 0x26, 0x37, 0x78, 0x3a, 0x0e, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/timesync/time_sync_producer.go b/internal/timesync/time_sync_producer.go index 780acffa4a..3e459a9e9d 100644 --- a/internal/timesync/time_sync_producer.go +++ b/internal/timesync/time_sync_producer.go @@ -4,85 +4,50 @@ import ( "context" "log" - "github.com/zilliztech/milvus-distributed/internal/errors" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) -type timeSyncMsgProducer struct { - //softTimeTickBarrier - proxyTtBarrier TimeTickBarrier - //hardTimeTickBarrier - writeNodeTtBarrier TimeTickBarrier - - ddSyncStream ms.MsgStream // insert & delete - dmSyncStream ms.MsgStream - k2sSyncStream ms.MsgStream +type MsgProducer struct { + ttBarrier TimeTickBarrier ctx context.Context cancel context.CancelFunc - proxyWatchers []TimeTickWatcher - writeNodeWatchers []TimeTickWatcher + watchers []TimeTickWatcher } -func NewTimeSyncMsgProducer(ctx context.Context) (*timeSyncMsgProducer, error) { - ctx2, cancel := context.WithCancel(ctx) - return &timeSyncMsgProducer{ctx: ctx2, cancel: cancel}, nil +func NewTimeSyncMsgProducer(ctx context.Context, ttBarrier TimeTickBarrier, watchers ...TimeTickWatcher) (*MsgProducer, error) { + childCtx, cancelFunc := context.WithCancel(ctx) + return &MsgProducer{ + ctx: childCtx, + cancel: cancelFunc, + ttBarrier: ttBarrier, + watchers: watchers, + }, nil } -func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) { - syncMsgProducer.proxyTtBarrier = proxyTtBarrier -} - -func (syncMsgProducer *timeSyncMsgProducer) SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) { - syncMsgProducer.writeNodeTtBarrier = writeNodeTtBarrier -} -func (syncMsgProducer *timeSyncMsgProducer) SetDDSyncStream(ddSync ms.MsgStream) { - syncMsgProducer.ddSyncStream = ddSync -} - -func (syncMsgProducer *timeSyncMsgProducer) SetDMSyncStream(dmSync ms.MsgStream) { - syncMsgProducer.dmSyncStream = dmSync -} - -func (syncMsgProducer *timeSyncMsgProducer) SetK2sSyncStream(k2sSync ms.MsgStream) { - syncMsgProducer.k2sSyncStream = k2sSync -} - -func (syncMsgProducer *timeSyncMsgProducer) WatchProxyTtBarrier(watcher TimeTickWatcher) { - syncMsgProducer.proxyWatchers = append(syncMsgProducer.proxyWatchers, watcher) -} - -func (syncMsgProducer *timeSyncMsgProducer) WatchWriteNodeTtBarrier(watcher TimeTickWatcher) { - syncMsgProducer.writeNodeWatchers = append(syncMsgProducer.writeNodeWatchers, watcher) -} - -func (syncMsgProducer *timeSyncMsgProducer) broadcastMsg(barrier TimeTickBarrier, streams []ms.MsgStream, watchers []TimeTickWatcher) error { +func (producer *MsgProducer) broadcastMsg() { for { select { - case <-syncMsgProducer.ctx.Done(): - { - log.Printf("broadcast context done, exit") - return errors.Errorf("broadcast done exit") - } + case <-producer.ctx.Done(): + log.Printf("broadcast context done, exit") default: - timetick, err := barrier.GetTimeTick() + tt, err := producer.ttBarrier.GetTimeTick() if err != nil { log.Printf("broadcast get time tick error") } - msgPack := ms.MsgPack{} baseMsg := ms.BaseMsg{ - BeginTimestamp: timetick, - EndTimestamp: timetick, + BeginTimestamp: tt, + EndTimestamp: tt, HashValues: []uint32{0}, } timeTickResult := internalpb2.TimeTickMsg{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kTimeTick, MsgID: 0, - Timestamp: timetick, + Timestamp: tt, SourceID: 0, }, } @@ -90,56 +55,32 @@ func (syncMsgProducer *timeSyncMsgProducer) broadcastMsg(barrier TimeTickBarrier BaseMsg: baseMsg, TimeTickMsg: timeTickResult, } - msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - for _, stream := range streams { - err = stream.Broadcast(&msgPack) - } - - for _, watcher := range watchers { + for _, watcher := range producer.watchers { watcher.Watch(timeTickMsg) } - if err != nil { - return err - } } } } -func (syncMsgProducer *timeSyncMsgProducer) Start() error { - err := syncMsgProducer.proxyTtBarrier.Start() +func (producer *MsgProducer) Start() error { + err := producer.ttBarrier.Start() if err != nil { return err } - err = syncMsgProducer.writeNodeTtBarrier.Start() - if err != nil { - return err - } - - for _, watcher := range syncMsgProducer.proxyWatchers { - watcher.Start() - } - for _, watcher := range syncMsgProducer.writeNodeWatchers { + for _, watcher := range producer.watchers { watcher.Start() } - go syncMsgProducer.broadcastMsg(syncMsgProducer.proxyTtBarrier, []ms.MsgStream{syncMsgProducer.dmSyncStream, syncMsgProducer.ddSyncStream}, syncMsgProducer.proxyWatchers) - go syncMsgProducer.broadcastMsg(syncMsgProducer.writeNodeTtBarrier, []ms.MsgStream{syncMsgProducer.k2sSyncStream}, syncMsgProducer.writeNodeWatchers) + go producer.broadcastMsg() return nil } -func (syncMsgProducer *timeSyncMsgProducer) Close() { - syncMsgProducer.ddSyncStream.Close() - syncMsgProducer.dmSyncStream.Close() - syncMsgProducer.k2sSyncStream.Close() - syncMsgProducer.cancel() - syncMsgProducer.proxyTtBarrier.Close() - syncMsgProducer.writeNodeTtBarrier.Close() - for _, watcher := range syncMsgProducer.proxyWatchers { - watcher.Close() - } - for _, watcher := range syncMsgProducer.writeNodeWatchers { +func (producer *MsgProducer) Close() { + producer.cancel() + producer.ttBarrier.Close() + for _, watcher := range producer.watchers { watcher.Close() } } diff --git a/internal/timesync/timetick_watcher.go b/internal/timesync/timetick_watcher.go index 27eb9e4160..de62690e4b 100644 --- a/internal/timesync/timetick_watcher.go +++ b/internal/timesync/timetick_watcher.go @@ -1,6 +1,8 @@ package timesync import ( + "log" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" ) @@ -9,3 +11,24 @@ type TimeTickWatcher interface { Start() Close() } + +type MsgTimeTickWatcher struct { + streams []ms.MsgStream +} + +func (watcher *MsgTimeTickWatcher) Watch(msg *ms.TimeTickMsg) { + msgPack := &ms.MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, msg) + for _, stream := range watcher.streams { + if err := stream.Broadcast(msgPack); err != nil { + log.Printf("stream broadcast failed %s", err.Error()) + } + } + +} + +func (watcher *MsgTimeTickWatcher) Start() { +} + +func (watcher *MsgTimeTickWatcher) Close() { +}