Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
12 KiB
6. Proxy
6.0 Proxy Service Interface
type ProxyService interface {
Service
RegisterLink() (RegisterLinkResponse, error)
RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error)
InvalidateCollectionMetaCache(req InvalidateCollMetaCacheRequest) error
}
- MsgBase
type MsgBase struct {
MsgType MsgType
MsgID UniqueID
Timestamp Timestamp
SourceID UniqueID
}
- RegisterLink
type RegisterLinkResponse struct {
Address string
Port int32
}
- RegisterNode
type RegisterNodeRequest struct {
MsgBase
Address string
Port int64
}
type RegisterNodeResponse struct {
//InitParams
}
- InvalidateCollectionMetaCache
type InvalidateCollMetaCacheRequest struct {
MsgBase
DbName string
CollectionName string
}
6.0 Proxy Node Interface
type ProxyNode interface {
Service
//SetTimeTickChannel(channelName string) error
//SetStatsChannel(channelName string) error
CreateCollection(req CreateCollectionRequest) error
DropCollection(req DropCollectionRequest) error
HasCollection(req HasCollectionRequest) (bool, error)
LoadCollection(req LoadCollectionRequest) error
ReleaseCollection(req ReleaseCollectionRequest) error
DescribeCollection(req DescribeCollectionRequest) (DescribeCollectionResponse, error)
GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
ShowCollections(req ShowCollectionRequest) (ShowCollectionResponse, error)
CreatePartition(req CreatePartitionRequest) error
DropPartition(req DropPartitionRequest) error
HasPartition(req HasPartitionRequest) (bool, error)
LoadPartitions(req LoadPartitonRequest) error
ReleasePartitions(req ReleasePartitionRequest) error
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse)
CreateIndex(req CreateIndexRequest) error
DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error)
Insert(req InsertRequest) (InsertResponse, error)
Search(req SearchRequest) (SearchResults, error)
Flush(req FlushRequest) error
GetPersistentSegmentInfo(req PersistentSegmentInfoRequest) (PersistentSegmentInfoResponse, error)
}
- CreateCollection
See Master API for detailed definitions.
- DropCollection
See Master API for detailed definitions.
- HasCollection
See Master API for detailed definitions.
- LoadCollection
type LoadCollectionRequest struct {
MsgBase
DbName string
CollectionName string
}
- ReleaseCollection
type ReleaseCollectionRequest struct {
MsgBase
DbName string
CollectionName string
}
- DescribeCollection
See Master API for detailed definitions.
- GetCollectionStatistics
See Master API for detailed definitions.
- ShowCollections
See Master API for detailed definitions.
- CreatePartition
See Master API for detailed definitions.
- DropPartition
See Master API for detailed definitions.
- HasPartition
See Master API for detailed definitions.
- LoadPartitions
type LoadPartitonRequest struct {
MsgBase
DbName string
CollectionName string
PartitionNames []string
}
- ReleasePartitions
type ReleasePartitionRequest struct {
MsgBase
DbName string
CollectionName string
PartitionNames []string
}
- GetPartitionStatistics
See Master API for detailed definitions.
- ShowPartitions
See Master API for detailed definitions.
- CreateIndex
See Master API for detailed definitions.
- DescribeIndex
See Master API for detailed definitions.
- Insert
type InsertRequest struct {
MsgBase
DbName string
CollectionName string
PartitionName string
RowData []Blob
HashKeys []uint32
}
type InsertResponse struct {
RowIDBegin UniqueID
RowIDEnd UniqueID
}
- Search
type SearchRequest struct {
MsgBase
DbName string
CollectionName string
PartitionNames []string
Dsl string
PlaceholderGroup []byte
}
- Flush
type FlushRequest struct {
MsgBase
DbName string
CollectionName string
}
- GetPersistentSegmentInfo
type PersistentSegmentInfoRequest struct{
MsgBase
DbName string
CollectionName string
}
type PersistentSegmentInfo struct {
SegmentID UniqueID
CollectionID UniqueID
PartitionID UniqueID
OpenTime Timestamp
SealedTime Timestamp
FlushedTime Timestamp
NumRows int64
MemSize int64
State SegmentState
}
type PersistentSegmentInfoResponse struct{
infos []SegmentInfo
}
6.1 Proxy Instance
type Proxy struct {
servicepb.UnimplementedMilvusServiceServer
masterClient mpb.MasterClient
timeTick *timeTick
ttStream *MessageStream
scheduler *taskScheduler
tsAllocator *TimestampAllocator
ReqIdAllocator *IdAllocator
RowIdAllocator *IdAllocator
SegIdAssigner *segIdAssigner
}
func (proxy *Proxy) Start() error
func NewProxy(ctx context.Context) *Proxy
Global Parameter Table
type GlobalParamsTable struct {
}
func (*paramTable GlobalParamsTable) ProxyId() int64
func (*paramTable GlobalParamsTable) ProxyAddress() string
func (*paramTable GlobalParamsTable) MasterAddress() string
func (*paramTable GlobalParamsTable) PulsarAddress() string
func (*paramTable GlobalParamsTable) TimeTickTopic() string
func (*paramTable GlobalParamsTable) InsertTopics() []string
func (*paramTable GlobalParamsTable) QueryTopic() string
func (*paramTable GlobalParamsTable) QueryResultTopics() []string
func (*paramTable GlobalParamsTable) Init() error
var ProxyParamTable GlobalParamsTable
6.2 Task
type task interface {
Id() int64 // return ReqId
PreExecute() error
Execute() error
PostExecute() error
WaitToFinish() error
Notify() error
}
- Base Task
type baseTask struct {
Type ReqType
ReqId int64
Ts Timestamp
ProxyId int64
}
func (task *baseTask) PreExecute() error
func (task *baseTask) Execute() error
func (task *baseTask) PostExecute() error
func (task *baseTask) WaitToFinish() error
func (task *baseTask) Notify() error
-
Insert Task
Take insertTask as an example:
type insertTask struct {
baseTask
SegIdAssigner *segIdAssigner
RowIdAllocator *IdAllocator
rowBatch *RowBatch
}
func (task *InsertTask) Execute() error
func (task *InsertTask) WaitToFinish() error
func (task *InsertTask) Notify() error
6.2 Task Scheduler
- Base Task Queue
type baseTaskQueue struct {
unissuedTasks *List
activeTasks map[int64]*task
utLock sync.Mutex // lock for UnissuedTasks
atLock sync.Mutex // lock for ActiveTasks
}
func (queue *baseTaskQueue) AddUnissuedTask(task *task)
func (queue *baseTaskQueue) FrontUnissuedTask() *task
func (queue *baseTaskQueue) PopUnissuedTask(id int64) *task
func (queue *baseTaskQueue) AddActiveTask(task *task)
func (queue *baseTaskQueue) PopActiveTask(id int64) *task
func (queue *baseTaskQueue) TaskDoneTest(ts Timestamp) bool
AddUnissuedTask(task *task) will put a new task into unissuedTasks, while maintaining the list by timestamp order.
TaskDoneTest(ts Timestamp) will check both unissuedTasks and unissuedTasks. If no task found before ts, then the function returns true, indicates that all the tasks before ts are completed.
- Data Definition Task Queue
type ddTaskQueue struct {
baseTaskQueue
lock sync.Mutex
}
func (queue *ddTaskQueue) Enqueue(task *task) error
func newDdTaskQueue() *ddTaskQueue
Data definition tasks (i.e. CreateCollectionTask) will be put into DdTaskQueue. If a task is enqueued, Enqueue(task *task) will set Ts, ReqId, ProxyId, then push it into queue. The timestamps of the enqueued tasks should be strictly monotonically increasing. As Enqueue(task *task) will be called in parallel, setting timestamp and queue insertion need to be done atomically.
- Data Manipulation Task Queue
type dmTaskQueue struct {
baseTaskQueue
}
func (queue *dmTaskQueue) Enqueue(task *task) error
func newDmTaskQueue() *dmTaskQueue
Insert tasks and delete tasks will be put into DmTaskQueue.
If a insertTask is enqueued, Enqueue(task *task) will set Ts, ReqId, ProxyId, SegIdAssigner, RowIdAllocator, then push it into queue. The SegIdAssigner and RowIdAllocator will later be used in the task's execution phase.
- Data Query Task Queue
type dqTaskQueue struct {
baseTaskQueue
}
func (queue *dqTaskQueue) Enqueue(task *task) error
func newDqTaskQueue() *dqTaskQueue
Queries will be put into DqTaskQueue.
- Task Scheduler
type taskScheduler struct {
DdQueue *ddTaskQueue
DmQueue *dmTaskQueue
DqQueue *dqTaskQueue
tsAllocator *TimestampAllocator
ReqIdAllocator *IdAllocator
}
func (sched *taskScheduler) scheduleDdTask() *task
func (sched *taskScheduler) scheduleDmTask() *task
func (sched *taskScheduler) scheduleDqTask() *task
func (sched *taskScheduler) Start() error
func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool
func newTaskScheduler(ctx context.Context, tsAllocator *TimestampAllocator, ReqIdAllocator *IdAllocator) *taskScheduler
scheduleDdTask() selects tasks in a FIFO manner, thus time order is garanteed.
The policy of scheduleDmTask() should target on throughput, not tasks' time order. Note that the time order of the tasks' execution will later be garanteed by the timestamp & time tick mechanism.
The policy of scheduleDqTask() should target on throughput. It should also take visibility into consideration. For example, if an insert task and a query arrive in a same time tick and the query comes after insert, the query should be scheduled in the next tick thus the query can see the insert.
TaskDoneTest(ts Timestamp) will check all the three task queues. If no task found before ts, then the function returns true, indicates that all the tasks before ts are completed.
- Statistics
// ActiveComponent interfaces
func (sched *taskScheduler) Id() String
func (sched *taskScheduler) Status() Status
func (sched *taskScheduler) Clean() Status
func (sched *taskScheduler) Restart() Status
func (sched *taskScheduler) heartbeat()
// protobuf
message taskSchedulerHeartbeat {
string id
uint64 dd_queue_length
uint64 dm_queue_length
uint64 dq_queue_length
uint64 num_dd_done
uint64 num_dm_done
uint64 num_dq_done
}
6.3 Time Tick
- Time Tick
type timeTick struct {
lastTick Timestamp
currentTick Timestamp
wallTick Timestamp
tickStep Timestamp
syncInterval Timestamp
tsAllocator *TimestampAllocator
scheduler *taskScheduler
ttStream *MessageStream
ctx context.Context
}
func (tt *timeTick) Start() error
func (tt *timeTick) synchronize() error
func newTimeTick(ctx context.Context, tickStep Timestamp, syncInterval Timestamp, tsAllocator *TimestampAllocator, scheduler *taskScheduler, ttStream *MessageStream) *timeTick
Start() will enter a loop. On each tickStep, it tries to send a TIME_TICK typed TsMsg into ttStream. After each syncInterval, it sychronizes its wallTick with tsAllocator by calling synchronize(). When currentTick + tickStep < wallTick holds, it will update currentTick with wallTick on next tick. Otherwise, it will update currentTick with currentTick + tickStep.
- Statistics
// ActiveComponent interfaces
func (tt *timeTick) ID() String
func (tt *timeTick) Status() Status
func (tt *timeTick) Clean() Status
func (tt *timeTick) Restart() Status
func (tt *timeTick) heartbeat()
// protobuf
message TimeTickHeartbeat {
string id
uint64 last_tick
}