2021-09-26 17:47:58 +08:00
## 5. Proxy
2020-11-19 10:59:10 +08:00
2021-02-24 09:48:17 +08:00
< img src = "./figs/proxy.png" width = 700 >
2021-01-04 14:16:43 +08:00
2021-09-26 17:47:58 +08:00
#### 5.0 Proxy Service Interface
2020-12-27 09:05:24 +08:00
2020-12-29 18:02:44 +08:00
```go
2021-01-13 11:08:03 +08:00
type ProxyService interface {
2021-04-12 12:45:38 +08:00
Component
TimeTickProvider
RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
2021-01-13 11:08:03 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _MsgBase_
2021-01-13 11:08:03 +08:00
```go
2021-03-04 10:35:28 +08:00
2021-01-15 14:38:36 +08:00
type MsgBase struct {
2021-04-12 12:45:38 +08:00
MsgType MsgType
MsgID UniqueID
Timestamp uint64
SourceID UniqueID
2020-12-27 09:05:24 +08:00
}
2021-01-11 18:35:54 +08:00
```
2021-01-11 15:17:06 +08:00
2021-10-02 21:28:04 +08:00
- _RegisterNode_
2021-01-11 18:35:54 +08:00
```go
2021-03-04 10:35:28 +08:00
type Address struct {
2021-04-12 12:45:38 +08:00
Ip string
Port int64
2021-03-04 10:35:28 +08:00
}
2021-01-13 11:08:03 +08:00
type RegisterNodeRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
Address string
Port int64
2021-03-04 10:35:28 +08:00
}
type InitParams struct {
2021-04-12 12:45:38 +08:00
NodeID UniqueID
StartParams []*commonpb.KeyValuePair
2021-01-13 11:08:03 +08:00
}
2021-01-11 18:35:54 +08:00
2021-01-13 11:08:03 +08:00
type RegisterNodeResponse struct {
2021-04-12 12:45:38 +08:00
InitParams *internalpb.InitParams
Status *commonpb.Status
2021-01-13 11:08:03 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _InvalidateCollectionMetaCache_
2021-01-13 11:08:03 +08:00
```go
type InvalidateCollMetaCacheRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbName string
CollectionName string
2021-01-13 11:08:03 +08:00
}
2021-01-11 18:35:54 +08:00
```
2021-09-26 17:47:58 +08:00
#### 5.1 Proxy Node Interface
2021-01-11 18:35:54 +08:00
```go
2021-06-23 16:14:08 +08:00
type Proxy interface {
2021-04-12 12:45:38 +08:00
Component
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
2021-01-13 11:08:03 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _InvalidateCollectionMetaCache_
2021-01-13 11:08:03 +08:00
2021-04-12 12:45:38 +08:00
```go
type InvalidateCollMetaCacheRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
2021-09-26 17:47:58 +08:00
#### 5.2 Milvus Service Interface
2021-04-12 12:45:38 +08:00
2021-06-23 16:14:08 +08:00
Proxy also implements Milvus Service interface to receive client grpc call.
2021-04-12 12:45:38 +08:00
```go
type MilvusService interface {
CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error)
DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error)
ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
2021-10-02 21:28:04 +08:00
2021-09-18 15:31:51 +08:00
CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error)
DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error)
AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error)
2021-04-12 12:45:38 +08:00
CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error)
GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error)
ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error)
DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error)
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error)
Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error)
Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error)
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error)
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error)
GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error)
2021-10-13 10:44:31 +08:00
GetQuerySegmentInfo(ctx context.Context, in *GetQuerySegmentInfoRequest, opts ...grpc.CallOption) (*GetQuerySegmentInfoResponse, error)
2021-04-12 12:45:38 +08:00
}
}
```
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _CreateCollection_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _DropCollection_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _HasCollection_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _LoadCollection_
2021-01-13 11:08:03 +08:00
```go
type LoadCollectionRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbName string
CollectionName string
2021-01-13 11:08:03 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _ReleaseCollection_
2021-01-13 11:08:03 +08:00
```go
type ReleaseCollectionRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbName string
CollectionName string
2021-01-13 11:08:03 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _DescribeCollection_
```go
type DescribeCollectionRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
CollectionID int64
TimeStamp uint64
}
```
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-03 10:32:24 +08:00
- _GetCollectionStatisticsRequest_
```go
type GetCollectionStatisticsRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _ShowCollections_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _CreateAlias_
2021-09-18 15:31:51 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-09-18 15:31:51 +08:00
2021-10-02 21:28:04 +08:00
- _DropAlias_
2021-09-18 15:31:51 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-09-18 15:31:51 +08:00
2021-10-02 21:28:04 +08:00
- _AlterAlias_
2021-09-18 15:31:51 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-09-18 15:31:51 +08:00
2021-10-02 21:28:04 +08:00
- _CreatePartition_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _DropPartition_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _HasPartition_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _LoadPartitions_
2021-01-13 11:08:03 +08:00
```go
2021-03-04 10:35:28 +08:00
type CollectionSchema struct {
2021-04-12 12:45:38 +08:00
Name string
Description string
AutoID bool
Fields []*FieldSchema
2021-03-04 10:35:28 +08:00
}
2021-01-13 11:08:03 +08:00
type LoadPartitonRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionIDs []UniqueID
Schema *schemapb.CollectionSchema
2021-01-13 11:08:03 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _ReleasePartitions_
2021-01-13 11:08:03 +08:00
```go
type ReleasePartitionRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionNames []string
2021-01-13 11:08:03 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _GetPartitionStatistics_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _ShowPartitions_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _CreateIndex_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _DescribeIndex_
2021-02-20 10:14:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-02-20 10:14:03 +08:00
2021-10-02 21:28:04 +08:00
- _DropIndex_
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
See _Master API_ for detailed definitions.
2021-01-13 11:08:03 +08:00
2021-10-02 21:28:04 +08:00
- _Insert_
2021-01-13 11:08:03 +08:00
```go
type InsertRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
RowData []Blob
HashKeys []uint32
2021-01-13 11:08:03 +08:00
}
type InsertResponse struct {
2021-04-12 12:45:38 +08:00
Status *commonpb.Status
RowIDBegin int64
RowIDEnd int64
2021-01-13 11:08:03 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _Search_
2021-01-13 11:08:03 +08:00
```go
type SearchRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionNames []string
Dsl string
PlaceholderGroup []byte
2021-01-13 11:08:03 +08:00
}
2021-03-04 10:35:28 +08:00
type SearchResults struct {
2021-04-12 12:45:38 +08:00
Status commonpb.Status
Hits byte
2021-03-04 10:35:28 +08:00
}
2021-01-13 11:08:03 +08:00
```
2021-10-02 21:28:04 +08:00
- _Flush_
2021-01-13 11:08:03 +08:00
```go
type FlushRequest struct {
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbName string
CollectionName string
2020-12-27 09:05:24 +08:00
}
```
2021-10-02 21:28:04 +08:00
- _GetPersistentSegmentInfo_
2021-02-03 18:55:00 +08:00
```go
type PersistentSegmentInfoRequest struct{
2021-04-12 12:45:38 +08:00
Base *commonpb.MsgBase
DbName string
CollectionName string
2021-02-03 18:55:00 +08:00
}
2021-03-04 10:35:28 +08:00
type SegmentState int32
const (
2021-04-12 12:45:38 +08:00
SegmentState_SegmentNone SegmentState = 0
SegmentState_SegmentNotExist SegmentState = 1
SegmentState_SegmentGrowing SegmentState = 2
SegmentState_SegmentSealed SegmentState = 3
SegmentState_SegmentFlushed SegmentState = 4
2021-03-04 10:35:28 +08:00
)
2021-02-03 18:55:00 +08:00
type PersistentSegmentInfo struct {
2021-04-12 12:45:38 +08:00
SegmentID UniqueID
CollectionID UniqueID
PartitionID UniqueID
OpenTime Timestamp
SealedTime Timestamp
FlushedTime Timestamp
NumRows int64
MemSize int64
State SegmentState
2021-02-03 18:55:00 +08:00
}
type PersistentSegmentInfoResponse struct{
2021-04-12 12:45:38 +08:00
infos []*milvuspb.SegmentInfo
2021-02-03 18:55:00 +08:00
}
2021-03-04 10:35:28 +08:00
```
2020-12-27 09:05:24 +08:00
2021-09-26 17:47:58 +08:00
#### 5.3 Proxy Instance
2020-11-19 10:59:10 +08:00
```go
type Proxy struct {
2021-04-12 12:45:38 +08:00
ctx context.Context
cancel func()
wg sync.WaitGroup
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
initParams *internalpb.InitParams
ip string
port int
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
stateCode internalpb.StateCode
2021-10-02 21:28:04 +08:00
2021-06-23 16:14:08 +08:00
rootCoordClient RootCoordClient
indexCoordClient IndexCoordClient
dataCoordClient DataCoordClient
queryCoordClient QueryCoordClient
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
sched *TaskScheduler
tick *timeTick
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
segAssigner *SegIDAssigner
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
manipulationMsgStream msgstream.MsgStream
queryMsgStream msgstream.MsgStream
msFactory msgstream.Factory
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
2020-11-19 10:59:10 +08:00
}
2021-03-04 10:35:28 +08:00
func (node *NodeImpl) Init() error
func (node *NodeImpl) Start() error
func (node *NodeImpl) Stop() error
func (node *NodeImpl) AddStartCallback(callbacks ...func())
func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component, serviceName string) error
func (node *NodeImpl) lastTick() Timestamp
func (node *NodeImpl) AddCloseCallback(callbacks ...func())
2021-06-23 16:14:08 +08:00
func (node *NodeImpl) SetRootCoordClient(cli RootCoordClient)
func (node *NodeImpl) SetIndexCoordClient(cli IndexCoordClient)
func (node *NodeImpl) SetDataCoordClient(cli DataCoordClient)
func (node *NodeImpl) SetProxyCoordClient(cli ProxyCoordClient)
func (node *NodeImpl) SetQueryCoordClient(cli QueryCoordClient)
2021-03-04 10:35:28 +08:00
2021-06-23 16:14:08 +08:00
func NewProxyImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error)
2020-11-19 10:59:10 +08:00
```
#### Global Parameter Table
```go
type GlobalParamsTable struct {
2021-04-12 12:45:38 +08:00
paramtable.BaseTable
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
NetworkPort int
IP string
NetworkAddress string
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
MasterAddress string
PulsarAddress string
2021-06-25 19:44:11 +08:00
RocksmqPath string
2021-08-16 18:46:10 +08:00
RocksmqRetentionTimeInMinutes int64
RocksmqRetentionSizeInMB int64
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
ProxyID UniqueID
TimeTickInterval time.Duration
InsertChannelNames []string
DeleteChannelNames []string
K2SChannelNames []string
SearchChannelNames []string
SearchResultChannelNames []string
ProxySubName string
ProxyTimeTickChannelNames []string
DataDefinitionChannelNames []string
MsgStreamTimeTickBufSize int64
MaxNameLength int64
MaxFieldNum int64
MaxDimension int64
2021-04-13 10:04:39 +08:00
DefaultPartitionName string
2021-04-12 12:45:38 +08:00
DefaultIndexName string
2020-11-19 10:59:10 +08:00
}
2021-03-04 10:35:28 +08:00
var Params ParamTable
2020-11-19 10:59:10 +08:00
```
2021-09-26 17:47:58 +08:00
#### 5.4 Task
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
```go
2020-11-19 10:59:10 +08:00
type task interface {
2021-04-12 12:45:38 +08:00
TraceCtx() context.Context
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
Name() string
Type() commonpb.MsgType
BeginTs() Timestamp
EndTs() Timestamp
SetTs(ts Timestamp)
OnEnqueue() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
2020-11-19 10:59:10 +08:00
}
```
2021-09-26 17:47:58 +08:00
#### 5.5 Task Scheduler
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
- Base Task Queue
2020-11-19 10:59:10 +08:00
```go
2021-03-04 10:35:28 +08:00
type TaskQueue interface {
2021-04-12 12:45:38 +08:00
utChan() < -chan int
UTEmpty() bool
utFull() bool
addUnissuedTask(t task) error
FrontUnissuedTask() task
PopUnissuedTask() task
AddActiveTask(t task)
PopActiveTask(ts Timestamp) task
getTaskByReqID(reqID UniqueID) task
TaskDoneTest(ts Timestamp) bool
Enqueue(t task) error
2021-03-04 10:35:28 +08:00
}
2020-11-19 10:59:10 +08:00
type baseTaskQueue struct {
2021-04-12 12:45:38 +08:00
unissuedTasks *list.List
activeTasks map[Timestamp]task
utLock sync.Mutex
atLock sync.Mutex
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
maxTaskNum int64
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
utBufChan chan int
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
sched *TaskScheduler
2021-03-04 10:35:28 +08:00
}
2020-11-19 10:59:10 +08:00
```
2021-10-09 18:11:41 +08:00
_AddUnissuedTask(task \*task)_ will push a new task into _unissuedTasks_ , while maintaining the list by timestamp order.
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
_TaskDoneTest(ts Timestamp)_ will check both _unissuedTasks_ and _unissuedTasks_ . If no task found before _ts_ , then the function returns _true_ , indicates that all the tasks before _ts_ are completed.
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
- Data Definition Task Queue
2020-11-19 10:59:10 +08:00
```go
type ddTaskQueue struct {
2021-04-12 12:45:38 +08:00
baseTaskQueue
lock sync.Mutex
2020-11-19 10:59:10 +08:00
}
func (queue *ddTaskQueue) Enqueue(task *task) error
func newDdTaskQueue() *ddTaskQueue
```
2021-10-11 21:27:39 +08:00
Data definition tasks (i.e. _CreateCollectionTask_ ) will be pushed into _DdTaskQueue_ . If a task is enqueued, _Enqueue(task \*task)_ will set _Ts_ , _ReqId_ , _ProxyId_ , then push it into _queue_ . The timestamps of the enqueued tasks should be strictly monotonically increasing. As _Enqueue(task \*task)_ will be called in parallel, setting timestamp and queue insertion need to be done atomically.
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
- Data Manipulation Task Queue
2020-11-19 10:59:10 +08:00
```go
type dmTaskQueue struct {
2021-04-12 12:45:38 +08:00
baseTaskQueue
2020-11-19 10:59:10 +08:00
}
func (queue *dmTaskQueue) Enqueue(task *task) error
func newDmTaskQueue() *dmTaskQueue
```
2021-10-11 21:27:39 +08:00
Insert tasks and delete tasks will be pushed into _DmTaskQueue_ .
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
If a _insertTask_ is enqueued, _Enqueue(task \*task)_ will set _Ts_ , _ReqId_ , _ProxyId_ , _SegIdAssigner_ , _RowIdAllocator_ , then push it into _queue_ . The _SegIdAssigner_ and _RowIdAllocator_ will later be used in the task's execution phase.
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
- Data Query Task Queue
2020-11-19 10:59:10 +08:00
```go
type dqTaskQueue struct {
2021-04-12 12:45:38 +08:00
baseTaskQueue
2020-11-19 10:59:10 +08:00
}
func (queue *dqTaskQueue) Enqueue(task *task) error
func newDqTaskQueue() *dqTaskQueue
```
2021-10-11 21:27:39 +08:00
Queries will be pushed into _DqTaskQueue_ .
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
- Task Scheduler
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
```go
2020-11-19 10:59:10 +08:00
type taskScheduler struct {
2021-04-12 12:45:38 +08:00
DdQueue TaskQueue
DmQueue TaskQueue
DqQueue TaskQueue
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
msFactory msgstream.Factory
2020-11-19 10:59:10 +08:00
}
func (sched *taskScheduler) scheduleDdTask() *task
func (sched *taskScheduler) scheduleDmTask() *task
func (sched *taskScheduler) scheduleDqTask() *task
2021-03-04 10:35:28 +08:00
func (sched *TaskScheduler) getTaskByReqID(collMeta UniqueID) task
func (sched *TaskScheduler) processTask(t task, q TaskQueue)
2020-11-19 10:59:10 +08:00
func (sched *taskScheduler) Start() error
func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool
2021-03-04 10:35:28 +08:00
func NewTaskScheduler(ctx context.Context, idAllocator *allocator.IDAllocator, tsoAllocator *allocator.TimestampAllocator,
factory msgstream.Factory) (*TaskScheduler, error)
2020-11-19 10:59:10 +08:00
```
2021-10-02 21:28:04 +08:00
_scheduleDdTask()_ selects tasks in a FIFO manner, thus time order is garanteed.
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
The policy of _scheduleDmTask()_ should target on throughput, not tasks' time order. Note that the time order of the tasks' execution will later be garanteed by the timestamp & time tick mechanism.
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
The policy of _scheduleDqTask()_ should target on throughput. It should also take visibility into consideration. For example, if an insert task and a query arrive in a same time tick and the query comes after insert, the query should be scheduled in the next tick thus the query can see the insert.
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
_TaskDoneTest(ts Timestamp)_ will check all the three task queues. If no task found before _ts_ , then the function returns _true_ , indicates that all the tasks before _ts_ are completed.
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
- Statistics
2020-11-19 10:59:10 +08:00
2021-03-04 10:35:28 +08:00
// TODO
2021-10-02 21:28:04 +08:00
2020-11-19 10:59:10 +08:00
```go
// ActiveComponent interfaces
func (sched *taskScheduler) Id() String
func (sched *taskScheduler) Status() Status
func (sched *taskScheduler) Clean() Status
func (sched *taskScheduler) Restart() Status
func (sched *taskScheduler) heartbeat()
// protobuf
message taskSchedulerHeartbeat {
2021-04-12 12:45:38 +08:00
string id
uint64 dd_queue_length
uint64 dm_queue_length
uint64 dq_queue_length
uint64 num_dd_done
uint64 num_dm_done
uint64 num_dq_done
2020-11-19 10:59:10 +08:00
}
```
2021-03-04 10:35:28 +08:00
// TODO
2021-10-02 21:28:04 +08:00
2021-09-26 17:47:58 +08:00
#### 5.6 Time Tick
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
- Time Tick
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
```go
2020-11-19 10:59:10 +08:00
type timeTick struct {
2021-04-12 12:45:38 +08:00
lastTick Timestamp
currentTick Timestamp
wallTick Timestamp
tickStep Timestamp
syncInterval Timestamp
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
tsAllocator *TimestampAllocator
scheduler *taskScheduler
ttStream *MessageStream
2021-10-02 21:28:04 +08:00
2021-04-12 12:45:38 +08:00
ctx context.Context
2020-11-19 10:59:10 +08:00
}
func (tt *timeTick) Start() error
func (tt *timeTick) synchronize() error
func newTimeTick(ctx context.Context, tickStep Timestamp, syncInterval Timestamp, tsAllocator *TimestampAllocator, scheduler *taskScheduler, ttStream *MessageStream) *timeTick
```
2021-10-15 17:59:31 +08:00
_Start()_ will enter a loop. On each _tickStep_ , it tries to send a _TIME_TICK_ typed _TsMsg_ into _ttStream_ . After each _syncInterval_ , it synchronizes its _wallTick_ with _tsAllocator_ by calling _synchronize()_ . When _currentTick + tickStep < wallTick_ holds, it will update _currentTick_ with _wallTick_ on next tick. Otherwise, it will update _currentTick_ with _currentTick + tickStep_ .
2020-11-19 10:59:10 +08:00
2021-10-02 21:28:04 +08:00
- Statistics
2020-11-19 10:59:10 +08:00
```go
// ActiveComponent interfaces
func (tt *timeTick) ID() String
func (tt *timeTick) Status() Status
func (tt *timeTick) Clean() Status
func (tt *timeTick) Restart() Status
func (tt *timeTick) heartbeat()
// protobuf
message TimeTickHeartbeat {
string id
uint64 last_tick
}
```