[skip ci]add DescribeCollectionRequest in chap05_proxy.md (#9087)

Signed-off-by: ruiyi.jiang <ruiyi.jiang@zilliz.com>
This commit is contained in:
ryjiang 2021-10-02 21:28:04 +08:00 committed by GitHub
parent 111f6e4f99
commit 0dfb247018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,11 +1,7 @@
## 5. Proxy
<img src="./figs/proxy.png" width=700>
#### 5.0 Proxy Service Interface
```go
@ -18,7 +14,7 @@ type ProxyService interface {
}
```
* *MsgBase*
- _MsgBase_
```go
@ -30,7 +26,7 @@ type MsgBase struct {
}
```
* *RegisterNode*
- _RegisterNode_
```go
type Address struct {
@ -55,7 +51,7 @@ type RegisterNodeResponse struct {
}
```
* *InvalidateCollectionMetaCache*
- _InvalidateCollectionMetaCache_
```go
type InvalidateCollMetaCacheRequest struct {
@ -65,19 +61,17 @@ type InvalidateCollMetaCacheRequest struct {
}
```
#### 5.1 Proxy Node Interface
```go
type Proxy interface {
Component
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
```
* *InvalidateCollectionMetaCache*
- _InvalidateCollectionMetaCache_
```go
type InvalidateCollMetaCacheRequest struct {
@ -101,7 +95,7 @@ type MilvusService interface {
DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error)
ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error)
DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error)
AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error)
@ -113,37 +107,37 @@ type MilvusService interface {
ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error)
GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error)
ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error)
DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error)
Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error)
Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error)
Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error)
GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error)
GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error)
GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error)
}
}
```
* *CreateCollection*
- _CreateCollection_
See *Master API* for detailed definitions.
See _Master API_ for detailed definitions.
* *DropCollection*
- _DropCollection_
See *Master API* for detailed definitions.
See _Master API_ for detailed definitions.
* *HasCollection*
- _HasCollection_
See *Master API* for detailed definitions.
See _Master API_ for detailed definitions.
* *LoadCollection*
- _LoadCollection_
```go
type LoadCollectionRequest struct {
@ -153,7 +147,7 @@ type LoadCollectionRequest struct {
}
```
* *ReleaseCollection*
- _ReleaseCollection_
```go
type ReleaseCollectionRequest struct {
@ -163,43 +157,53 @@ type ReleaseCollectionRequest struct {
}
```
* *DescribeCollection*
- _DescribeCollection_
See *Master API* for detailed definitions.
```go
type DescribeCollectionRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
CollectionID int64
TimeStamp uint64
}
```
* *GetCollectionStatistics*
See _Master API_ for detailed definitions.
See *Master API* for detailed definitions.
- _GetCollectionStatistics_
* *ShowCollections*
See _Master API_ for detailed definitions.
See *Master API* for detailed definitions.
- _ShowCollections_
* *CreateAlias*
See _Master API_ for detailed definitions.
See *Master API* for detailed definitions.
- _CreateAlias_
* *DropAlias*
See _Master API_ for detailed definitions.
See *Master API* for detailed definitions.
- _DropAlias_
* *AlterAlias*
See _Master API_ for detailed definitions.
See *Master API* for detailed definitions.
- _AlterAlias_
* *CreatePartition*
See _Master API_ for detailed definitions.
See *Master API* for detailed definitions.
- _CreatePartition_
* *DropPartition*
See _Master API_ for detailed definitions.
See *Master API* for detailed definitions.
- _DropPartition_
* *HasPartition*
See _Master API_ for detailed definitions.
See *Master API* for detailed definitions.
- _HasPartition_
* *LoadPartitions*
See _Master API_ for detailed definitions.
- _LoadPartitions_
```go
type CollectionSchema struct {
@ -218,7 +222,7 @@ type LoadPartitonRequest struct {
}
```
* *ReleasePartitions*
- _ReleasePartitions_
```go
type ReleasePartitionRequest struct {
@ -229,27 +233,27 @@ type ReleasePartitionRequest struct {
}
```
* *GetPartitionStatistics*
- _GetPartitionStatistics_
See *Master API* for detailed definitions.
See _Master API_ for detailed definitions.
* *ShowPartitions*
- _ShowPartitions_
See *Master API* for detailed definitions.
See _Master API_ for detailed definitions.
* *CreateIndex*
- _CreateIndex_
See *Master API* for detailed definitions.
See _Master API_ for detailed definitions.
* *DescribeIndex*
- _DescribeIndex_
See *Master API* for detailed definitions.
See _Master API_ for detailed definitions.
* *DropIndex*
- _DropIndex_
See *Master API* for detailed definitions.
See _Master API_ for detailed definitions.
* *Insert*
- _Insert_
```go
type InsertRequest struct {
@ -268,7 +272,7 @@ type InsertResponse struct {
}
```
* *Search*
- _Search_
```go
type SearchRequest struct {
@ -286,7 +290,7 @@ type SearchResults struct {
}
```
* *Flush*
- _Flush_
```go
type FlushRequest struct {
@ -296,8 +300,7 @@ type FlushRequest struct {
}
```
* *GetPersistentSegmentInfo*
- _GetPersistentSegmentInfo_
```go
type PersistentSegmentInfoRequest struct{
@ -341,29 +344,29 @@ type Proxy struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
initParams *internalpb.InitParams
ip string
port int
stateCode internalpb.StateCode
rootCoordClient RootCoordClient
indexCoordClient IndexCoordClient
dataCoordClient DataCoordClient
queryCoordClient QueryCoordClient
sched *TaskScheduler
tick *timeTick
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
segAssigner *SegIDAssigner
manipulationMsgStream msgstream.MsgStream
queryMsgStream msgstream.MsgStream
msFactory msgstream.Factory
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -390,18 +393,18 @@ func NewProxyImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, er
```go
type GlobalParamsTable struct {
paramtable.BaseTable
NetworkPort int
IP string
NetworkAddress string
MasterAddress string
PulsarAddress string
RocksmqPath string
RocksmqRetentionTimeInMinutes int64
RocksmqRetentionSizeInMB int64
ProxyID UniqueID
TimeTickInterval time.Duration
InsertChannelNames []string
@ -423,10 +426,9 @@ type GlobalParamsTable struct {
var Params ParamTable
```
#### 5.4 Task
``` go
```go
type task interface {
TraceCtx() context.Context
ID() UniqueID // return ReqID
@ -447,7 +449,7 @@ type task interface {
#### 5.5 Task Scheduler
* Base Task Queue
- Base Task Queue
```go
type TaskQueue interface {
@ -469,22 +471,20 @@ type baseTaskQueue struct {
activeTasks map[Timestamp]task
utLock sync.Mutex
atLock sync.Mutex
maxTaskNum int64
utBufChan chan int
sched *TaskScheduler
}
```
*AddUnissuedTask(task \*task)* will put a new task into *unissuedTasks*, while maintaining the list by timestamp order.
_AddUnissuedTask(task \*task)_ will put a new task into _unissuedTasks_, while maintaining the list by timestamp order.
*TaskDoneTest(ts Timestamp)* will check both *unissuedTasks* and *unissuedTasks*. If no task found before *ts*, then the function returns *true*, indicates that all the tasks before *ts* are completed.
_TaskDoneTest(ts Timestamp)_ will check both _unissuedTasks_ and _unissuedTasks_. If no task found before _ts_, then the function returns _true_, indicates that all the tasks before _ts_ are completed.
* Data Definition Task Queue
- Data Definition Task Queue
```go
type ddTaskQueue struct {
@ -496,11 +496,9 @@ func (queue *ddTaskQueue) Enqueue(task *task) error
func newDdTaskQueue() *ddTaskQueue
```
Data definition tasks (i.e. *CreateCollectionTask*) will be put into *DdTaskQueue*. If a task is enqueued, *Enqueue(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, then push it into *queue*. The timestamps of the enqueued tasks should be strictly monotonically increasing. As *Enqueue(task \*task)* will be called in parallel, setting timestamp and queue insertion need to be done atomically.
Data definition tasks (i.e. _CreateCollectionTask_) will be put into _DdTaskQueue_. If a task is enqueued, _Enqueue(task \*task)_ will set _Ts_, _ReqId_, _ProxyId_, then push it into _queue_. The timestamps of the enqueued tasks should be strictly monotonically increasing. As _Enqueue(task \*task)_ will be called in parallel, setting timestamp and queue insertion need to be done atomically.
* Data Manipulation Task Queue
- Data Manipulation Task Queue
```go
type dmTaskQueue struct {
@ -511,13 +509,11 @@ func (queue *dmTaskQueue) Enqueue(task *task) error
func newDmTaskQueue() *dmTaskQueue
```
Insert tasks and delete tasks will be put into *DmTaskQueue*.
Insert tasks and delete tasks will be put into _DmTaskQueue_.
If a *insertTask* is enqueued, *Enqueue(task \*task)* will set *Ts*, *ReqId*, *ProxyId*, *SegIdAssigner*, *RowIdAllocator*, then push it into *queue*. The *SegIdAssigner* and *RowIdAllocator* will later be used in the task's execution phase.
If a _insertTask_ is enqueued, _Enqueue(task \*task)_ will set _Ts_, _ReqId_, _ProxyId_, _SegIdAssigner_, _RowIdAllocator_, then push it into _queue_. The _SegIdAssigner_ and _RowIdAllocator_ will later be used in the task's execution phase.
* Data Query Task Queue
- Data Query Task Queue
```go
type dqTaskQueue struct {
@ -528,25 +524,23 @@ func (queue *dqTaskQueue) Enqueue(task *task) error
func newDqTaskQueue() *dqTaskQueue
```
Queries will be put into *DqTaskQueue*.
Queries will be put into _DqTaskQueue_.
- Task Scheduler
* Task Scheduler
``` go
```go
type taskScheduler struct {
DdQueue TaskQueue
DmQueue TaskQueue
DqQueue TaskQueue
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
msFactory msgstream.Factory
}
@ -563,19 +557,18 @@ func NewTaskScheduler(ctx context.Context, idAllocator *allocator.IDAllocator, t
factory msgstream.Factory) (*TaskScheduler, error)
```
*scheduleDdTask()* selects tasks in a FIFO manner, thus time order is garanteed.
_scheduleDdTask()_ selects tasks in a FIFO manner, thus time order is garanteed.
The policy of *scheduleDmTask()* should target on throughput, not tasks' time order. Note that the time order of the tasks' execution will later be garanteed by the timestamp & time tick mechanism.
The policy of _scheduleDmTask()_ should target on throughput, not tasks' time order. Note that the time order of the tasks' execution will later be garanteed by the timestamp & time tick mechanism.
The policy of *scheduleDqTask()* should target on throughput. It should also take visibility into consideration. For example, if an insert task and a query arrive in a same time tick and the query comes after insert, the query should be scheduled in the next tick thus the query can see the insert.
The policy of _scheduleDqTask()_ should target on throughput. It should also take visibility into consideration. For example, if an insert task and a query arrive in a same time tick and the query comes after insert, the query should be scheduled in the next tick thus the query can see the insert.
*TaskDoneTest(ts Timestamp)* will check all the three task queues. If no task found before *ts*, then the function returns *true*, indicates that all the tasks before *ts* are completed.
_TaskDoneTest(ts Timestamp)_ will check all the three task queues. If no task found before _ts_, then the function returns _true_, indicates that all the tasks before _ts_ are completed.
* Statistics
- Statistics
// TODO
```go
// ActiveComponent interfaces
func (sched *taskScheduler) Id() String
@ -596,25 +589,24 @@ message taskSchedulerHeartbeat {
}
```
// TODO
#### 5.6 Time Tick
* Time Tick
- Time Tick
``` go
```go
type timeTick struct {
lastTick Timestamp
currentTick Timestamp
wallTick Timestamp
tickStep Timestamp
syncInterval Timestamp
tsAllocator *TimestampAllocator
scheduler *taskScheduler
ttStream *MessageStream
ctx context.Context
}
@ -624,10 +616,9 @@ func (tt *timeTick) synchronize() error
func newTimeTick(ctx context.Context, tickStep Timestamp, syncInterval Timestamp, tsAllocator *TimestampAllocator, scheduler *taskScheduler, ttStream *MessageStream) *timeTick
```
*Start()* will enter a loop. On each *tickStep*, it tries to send a *TIME_TICK* typed *TsMsg* into *ttStream*. After each *syncInterval*, it sychronizes its *wallTick* with *tsAllocator* by calling *synchronize()*. When *currentTick + tickStep < wallTick* holds, it will update *currentTick* with *wallTick* on next tick. Otherwise, it will update *currentTick* with *currentTick + tickStep*.
_Start()_ will enter a loop. On each _tickStep_, it tries to send a _TIME_TICK_ typed _TsMsg_ into _ttStream_. After each _syncInterval_, it sychronizes its _wallTick_ with _tsAllocator_ by calling _synchronize()_. When _currentTick + tickStep < wallTick_ holds, it will update _currentTick_ with _wallTick_ on next tick. Otherwise, it will update _currentTick_ with _currentTick + tickStep_.
* Statistics
- Statistics
```go
// ActiveComponent interfaces
@ -643,6 +634,3 @@ message TimeTickHeartbeat {
uint64 last_tick
}
```