From 0dfb247018e82c851e85fd6d82b275d4e5653631 Mon Sep 17 00:00:00 2001 From: ryjiang Date: Sat, 2 Oct 2021 21:28:04 +0800 Subject: [PATCH] [skip ci]add DescribeCollectionRequest in chap05_proxy.md (#9087) Signed-off-by: ruiyi.jiang --- docs/developer_guides/chap05_proxy.md | 220 ++++++++++++-------------- 1 file changed, 104 insertions(+), 116 deletions(-) diff --git a/docs/developer_guides/chap05_proxy.md b/docs/developer_guides/chap05_proxy.md index b63ff562bb..23e6ac54cf 100644 --- a/docs/developer_guides/chap05_proxy.md +++ b/docs/developer_guides/chap05_proxy.md @@ -1,11 +1,7 @@ ## 5. Proxy - - - - #### 5.0 Proxy Service Interface ```go @@ -18,7 +14,7 @@ type ProxyService interface { } ``` -* *MsgBase* +- _MsgBase_ ```go @@ -30,7 +26,7 @@ type MsgBase struct { } ``` -* *RegisterNode* +- _RegisterNode_ ```go type Address struct { @@ -55,7 +51,7 @@ type RegisterNodeResponse struct { } ``` -* *InvalidateCollectionMetaCache* +- _InvalidateCollectionMetaCache_ ```go type InvalidateCollMetaCacheRequest struct { @@ -65,19 +61,17 @@ type InvalidateCollMetaCacheRequest struct { } ``` - - #### 5.1 Proxy Node Interface ```go type Proxy interface { Component - + InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) } ``` -* *InvalidateCollectionMetaCache* +- _InvalidateCollectionMetaCache_ ```go type InvalidateCollMetaCacheRequest struct { @@ -101,7 +95,7 @@ type MilvusService interface { DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) - + CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) @@ -113,37 +107,37 @@ type MilvusService interface { ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) - + CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) - + Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) - + GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) - + GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) } } ``` -* *CreateCollection* +- _CreateCollection_ -See *Master API* for detailed definitions. +See _Master API_ for detailed definitions. -* *DropCollection* +- _DropCollection_ -See *Master API* for detailed definitions. +See _Master API_ for detailed definitions. -* *HasCollection* +- _HasCollection_ -See *Master API* for detailed definitions. +See _Master API_ for detailed definitions. -* *LoadCollection* +- _LoadCollection_ ```go type LoadCollectionRequest struct { @@ -153,7 +147,7 @@ type LoadCollectionRequest struct { } ``` -* *ReleaseCollection* +- _ReleaseCollection_ ```go type ReleaseCollectionRequest struct { @@ -163,43 +157,53 @@ type ReleaseCollectionRequest struct { } ``` -* *DescribeCollection* +- _DescribeCollection_ -See *Master API* for detailed definitions. +```go +type DescribeCollectionRequest struct { + Base *commonpb.MsgBase + DbName string + CollectionName string + CollectionID int64 + TimeStamp uint64 +} +``` -* *GetCollectionStatistics* +See _Master API_ for detailed definitions. -See *Master API* for detailed definitions. +- _GetCollectionStatistics_ -* *ShowCollections* +See _Master API_ for detailed definitions. -See *Master API* for detailed definitions. +- _ShowCollections_ -* *CreateAlias* +See _Master API_ for detailed definitions. -See *Master API* for detailed definitions. +- _CreateAlias_ -* *DropAlias* +See _Master API_ for detailed definitions. -See *Master API* for detailed definitions. +- _DropAlias_ -* *AlterAlias* +See _Master API_ for detailed definitions. -See *Master API* for detailed definitions. +- _AlterAlias_ -* *CreatePartition* +See _Master API_ for detailed definitions. -See *Master API* for detailed definitions. +- _CreatePartition_ -* *DropPartition* +See _Master API_ for detailed definitions. -See *Master API* for detailed definitions. +- _DropPartition_ -* *HasPartition* +See _Master API_ for detailed definitions. -See *Master API* for detailed definitions. +- _HasPartition_ -* *LoadPartitions* +See _Master API_ for detailed definitions. + +- _LoadPartitions_ ```go type CollectionSchema struct { @@ -218,7 +222,7 @@ type LoadPartitonRequest struct { } ``` -* *ReleasePartitions* +- _ReleasePartitions_ ```go type ReleasePartitionRequest struct { @@ -229,27 +233,27 @@ type ReleasePartitionRequest struct { } ``` -* *GetPartitionStatistics* +- _GetPartitionStatistics_ -See *Master API* for detailed definitions. +See _Master API_ for detailed definitions. -* *ShowPartitions* +- _ShowPartitions_ -See *Master API* for detailed definitions. +See _Master API_ for detailed definitions. -* *CreateIndex* +- _CreateIndex_ -See *Master API* for detailed definitions. +See _Master API_ for detailed definitions. -* *DescribeIndex* +- _DescribeIndex_ -See *Master API* for detailed definitions. +See _Master API_ for detailed definitions. -* *DropIndex* +- _DropIndex_ -See *Master API* for detailed definitions. +See _Master API_ for detailed definitions. -* *Insert* +- _Insert_ ```go type InsertRequest struct { @@ -268,7 +272,7 @@ type InsertResponse struct { } ``` -* *Search* +- _Search_ ```go type SearchRequest struct { @@ -286,7 +290,7 @@ type SearchResults struct { } ``` -* *Flush* +- _Flush_ ```go type FlushRequest struct { @@ -296,8 +300,7 @@ type FlushRequest struct { } ``` - -* *GetPersistentSegmentInfo* +- _GetPersistentSegmentInfo_ ```go type PersistentSegmentInfoRequest struct{ @@ -341,29 +344,29 @@ type Proxy struct { ctx context.Context cancel func() wg sync.WaitGroup - + initParams *internalpb.InitParams ip string port int - + stateCode internalpb.StateCode - + rootCoordClient RootCoordClient indexCoordClient IndexCoordClient dataCoordClient DataCoordClient queryCoordClient QueryCoordClient - + sched *TaskScheduler tick *timeTick - + idAllocator *allocator.IDAllocator tsoAllocator *allocator.TimestampAllocator segAssigner *SegIDAssigner - + manipulationMsgStream msgstream.MsgStream queryMsgStream msgstream.MsgStream msFactory msgstream.Factory - + // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() @@ -390,18 +393,18 @@ func NewProxyImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, er ```go type GlobalParamsTable struct { paramtable.BaseTable - + NetworkPort int IP string NetworkAddress string - + MasterAddress string PulsarAddress string RocksmqPath string RocksmqRetentionTimeInMinutes int64 RocksmqRetentionSizeInMB int64 - + ProxyID UniqueID TimeTickInterval time.Duration InsertChannelNames []string @@ -423,10 +426,9 @@ type GlobalParamsTable struct { var Params ParamTable ``` - #### 5.4 Task -``` go +```go type task interface { TraceCtx() context.Context ID() UniqueID // return ReqID @@ -447,7 +449,7 @@ type task interface { #### 5.5 Task Scheduler -* Base Task Queue +- Base Task Queue ```go type TaskQueue interface { @@ -469,22 +471,20 @@ type baseTaskQueue struct { activeTasks map[Timestamp]task utLock sync.Mutex atLock sync.Mutex - + maxTaskNum int64 - + utBufChan chan int - + sched *TaskScheduler } ``` -*AddUnissuedTask(task \*task)* will put a new task into *unissuedTasks*, while maintaining the list by timestamp order. +_AddUnissuedTask(task \*task)_ will put a new task into _unissuedTasks_, while maintaining the list by timestamp order. -*TaskDoneTest(ts Timestamp)* will check both *unissuedTasks* and *unissuedTasks*. If no task found before *ts*, then the function returns *true*, indicates that all the tasks before *ts* are completed. +_TaskDoneTest(ts Timestamp)_ will check both _unissuedTasks_ and _unissuedTasks_. If no task found before _ts_, then the function returns _true_, indicates that all the tasks before _ts_ are completed. - - -* Data Definition Task Queue +- Data Definition Task Queue ```go type ddTaskQueue struct { @@ -496,11 +496,9 @@ func (queue *ddTaskQueue) Enqueue(task *task) error func newDdTaskQueue() *ddTaskQueue ``` -Data definition tasks (i.e. *CreateCollectionTask*) will be put into *DdTaskQueue*. If a task is enqueued, *Enqueue(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, then push it into *queue*. The timestamps of the enqueued tasks should be strictly monotonically increasing. As *Enqueue(task \*task)* will be called in parallel, setting timestamp and queue insertion need to be done atomically. +Data definition tasks (i.e. _CreateCollectionTask_) will be put into _DdTaskQueue_. If a task is enqueued, _Enqueue(task \*task)_ will set _Ts_, _ReqId_, _ProxyId_, then push it into _queue_. The timestamps of the enqueued tasks should be strictly monotonically increasing. As _Enqueue(task \*task)_ will be called in parallel, setting timestamp and queue insertion need to be done atomically. - - -* Data Manipulation Task Queue +- Data Manipulation Task Queue ```go type dmTaskQueue struct { @@ -511,13 +509,11 @@ func (queue *dmTaskQueue) Enqueue(task *task) error func newDmTaskQueue() *dmTaskQueue ``` -Insert tasks and delete tasks will be put into *DmTaskQueue*. +Insert tasks and delete tasks will be put into _DmTaskQueue_. -If a *insertTask* is enqueued, *Enqueue(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, *SegIdAssigner*, *RowIdAllocator*, then push it into *queue*. The *SegIdAssigner* and *RowIdAllocator* will later be used in the task's execution phase. +If a _insertTask_ is enqueued, _Enqueue(task \*task)_ will set _Ts_, _ReqId_, _ProxyId_, _SegIdAssigner_, _RowIdAllocator_, then push it into _queue_. The _SegIdAssigner_ and _RowIdAllocator_ will later be used in the task's execution phase. - - -* Data Query Task Queue +- Data Query Task Queue ```go type dqTaskQueue struct { @@ -528,25 +524,23 @@ func (queue *dqTaskQueue) Enqueue(task *task) error func newDqTaskQueue() *dqTaskQueue ``` -Queries will be put into *DqTaskQueue*. +Queries will be put into _DqTaskQueue_. +- Task Scheduler - -* Task Scheduler - -``` go +```go type taskScheduler struct { DdQueue TaskQueue DmQueue TaskQueue DqQueue TaskQueue - + idAllocator *allocator.IDAllocator tsoAllocator *allocator.TimestampAllocator - + wg sync.WaitGroup ctx context.Context cancel context.CancelFunc - + msFactory msgstream.Factory } @@ -563,19 +557,18 @@ func NewTaskScheduler(ctx context.Context, idAllocator *allocator.IDAllocator, t factory msgstream.Factory) (*TaskScheduler, error) ``` -*scheduleDdTask()* selects tasks in a FIFO manner, thus time order is garanteed. +_scheduleDdTask()_ selects tasks in a FIFO manner, thus time order is garanteed. -The policy of *scheduleDmTask()* should target on throughput, not tasks' time order. Note that the time order of the tasks' execution will later be garanteed by the timestamp & time tick mechanism. +The policy of _scheduleDmTask()_ should target on throughput, not tasks' time order. Note that the time order of the tasks' execution will later be garanteed by the timestamp & time tick mechanism. -The policy of *scheduleDqTask()* should target on throughput. It should also take visibility into consideration. For example, if an insert task and a query arrive in a same time tick and the query comes after insert, the query should be scheduled in the next tick thus the query can see the insert. +The policy of _scheduleDqTask()_ should target on throughput. It should also take visibility into consideration. For example, if an insert task and a query arrive in a same time tick and the query comes after insert, the query should be scheduled in the next tick thus the query can see the insert. -*TaskDoneTest(ts Timestamp)* will check all the three task queues. If no task found before *ts*, then the function returns *true*, indicates that all the tasks before *ts* are completed. +_TaskDoneTest(ts Timestamp)_ will check all the three task queues. If no task found before _ts_, then the function returns _true_, indicates that all the tasks before _ts_ are completed. - - -* Statistics +- Statistics // TODO + ```go // ActiveComponent interfaces func (sched *taskScheduler) Id() String @@ -596,25 +589,24 @@ message taskSchedulerHeartbeat { } ``` - - // TODO + #### 5.6 Time Tick -* Time Tick +- Time Tick -``` go +```go type timeTick struct { lastTick Timestamp currentTick Timestamp wallTick Timestamp tickStep Timestamp syncInterval Timestamp - + tsAllocator *TimestampAllocator scheduler *taskScheduler ttStream *MessageStream - + ctx context.Context } @@ -624,10 +616,9 @@ func (tt *timeTick) synchronize() error func newTimeTick(ctx context.Context, tickStep Timestamp, syncInterval Timestamp, tsAllocator *TimestampAllocator, scheduler *taskScheduler, ttStream *MessageStream) *timeTick ``` -*Start()* will enter a loop. On each *tickStep*, it tries to send a *TIME_TICK* typed *TsMsg* into *ttStream*. After each *syncInterval*, it sychronizes its *wallTick* with *tsAllocator* by calling *synchronize()*. When *currentTick + tickStep < wallTick* holds, it will update *currentTick* with *wallTick* on next tick. Otherwise, it will update *currentTick* with *currentTick + tickStep*. +_Start()_ will enter a loop. On each _tickStep_, it tries to send a _TIME_TICK_ typed _TsMsg_ into _ttStream_. After each _syncInterval_, it sychronizes its _wallTick_ with _tsAllocator_ by calling _synchronize()_. When _currentTick + tickStep < wallTick_ holds, it will update _currentTick_ with _wallTick_ on next tick. Otherwise, it will update _currentTick_ with _currentTick + tickStep_. - -* Statistics +- Statistics ```go // ActiveComponent interfaces @@ -643,6 +634,3 @@ message TimeTickHeartbeat { uint64 last_tick } ``` - - -