Signed-off-by: Jael Gu <mengjia.gu@zilliz.com>
17 KiB
5. Proxy
5.0 Proxy Service Interface
type ProxyService interface {
Component
TimeTickProvider
RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
- MsgBase
type MsgBase struct {
MsgType MsgType
MsgID UniqueID
Timestamp uint64
SourceID UniqueID
}
- RegisterNode
type Address struct {
Ip string
Port int64
}
type RegisterNodeRequest struct {
Base *commonpb.MsgBase
Address string
Port int64
}
type InitParams struct {
NodeID UniqueID
StartParams []*commonpb.KeyValuePair
}
type RegisterNodeResponse struct {
InitParams *internalpb.InitParams
Status *commonpb.Status
}
- InvalidateCollectionMetaCache
type InvalidateCollMetaCacheRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
}
5.1 Proxy Node Interface
type Proxy interface {
Component
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
- InvalidateCollectionMetaCache
type InvalidateCollMetaCacheRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
}
5.2 Milvus Service Interface
Proxy also implements Milvus Service interface to receive client grpc call.
type MilvusService interface {
CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error)
DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error)
ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error)
DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error)
AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error)
CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error)
GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error)
ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error)
DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error)
Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error)
Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error)
Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error)
GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error)
GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error)
GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error)
GetQuerySegmentInfo(ctx context.Context, in *GetQuerySegmentInfoRequest, opts ...grpc.CallOption) (*GetQuerySegmentInfoResponse, error)
}
}
- CreateCollection
See Master API for detailed definitions.
- DropCollection
See Master API for detailed definitions.
- HasCollection
See Master API for detailed definitions.
- LoadCollection
type LoadCollectionRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
}
- ReleaseCollection
type ReleaseCollectionRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
}
- DescribeCollection
type DescribeCollectionRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
CollectionID int64
TimeStamp uint64
}
See Master API for detailed definitions.
- GetCollectionStatisticsRequest
type GetCollectionStatisticsRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
}
See Master API for detailed definitions.
- ShowCollections
See Master API for detailed definitions.
- CreateAlias
See Master API for detailed definitions.
- DropAlias
See Master API for detailed definitions.
- AlterAlias
See Master API for detailed definitions.
- CreatePartition
See Master API for detailed definitions.
- DropPartition
See Master API for detailed definitions.
- HasPartition
See Master API for detailed definitions.
- LoadPartitions
type CollectionSchema struct {
Name string
Description string
AutoID bool
Fields []*FieldSchema
}
type LoadPartitonRequest struct {
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionIDs []UniqueID
Schema *schemapb.CollectionSchema
}
- ReleasePartitions
type ReleasePartitionRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionNames []string
}
- GetPartitionStatistics
See Master API for detailed definitions.
- ShowPartitions
See Master API for detailed definitions.
- CreateIndex
See Master API for detailed definitions.
- DescribeIndex
See Master API for detailed definitions.
- DropIndex
See Master API for detailed definitions.
- Insert
type InsertRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
RowData []Blob
HashKeys []uint32
}
type InsertResponse struct {
Status *commonpb.Status
RowIDBegin int64
RowIDEnd int64
}
- Search
type SearchRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionNames []string
Dsl string
PlaceholderGroup []byte
}
type SearchResults struct {
Status commonpb.Status
Hits byte
}
- Flush
type FlushRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
}
- GetPersistentSegmentInfo
type PersistentSegmentInfoRequest struct{
Base *commonpb.MsgBase
DbName string
CollectionName string
}
type SegmentState int32
const (
SegmentState_SegmentNone SegmentState = 0
SegmentState_SegmentNotExist SegmentState = 1
SegmentState_SegmentGrowing SegmentState = 2
SegmentState_SegmentSealed SegmentState = 3
SegmentState_SegmentFlushed SegmentState = 4
)
type PersistentSegmentInfo struct {
SegmentID UniqueID
CollectionID UniqueID
PartitionID UniqueID
OpenTime Timestamp
SealedTime Timestamp
FlushedTime Timestamp
NumRows int64
MemSize int64
State SegmentState
}
type PersistentSegmentInfoResponse struct{
infos []*milvuspb.SegmentInfo
}
5.3 Proxy Instance
type Proxy struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
initParams *internalpb.InitParams
ip string
port int
stateCode internalpb.StateCode
rootCoordClient RootCoordClient
indexCoordClient IndexCoordClient
dataCoordClient DataCoordClient
queryCoordClient QueryCoordClient
sched *TaskScheduler
tick *timeTick
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
segAssigner *SegIDAssigner
manipulationMsgStream msgstream.MsgStream
queryMsgStream msgstream.MsgStream
msFactory msgstream.Factory
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
}
func (node *NodeImpl) Init() error
func (node *NodeImpl) Start() error
func (node *NodeImpl) Stop() error
func (node *NodeImpl) AddStartCallback(callbacks ...func())
func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component, serviceName string) error
func (node *NodeImpl) lastTick() Timestamp
func (node *NodeImpl) AddCloseCallback(callbacks ...func())
func (node *NodeImpl) SetRootCoordClient(cli RootCoordClient)
func (node *NodeImpl) SetIndexCoordClient(cli IndexCoordClient)
func (node *NodeImpl) SetDataCoordClient(cli DataCoordClient)
func (node *NodeImpl) SetProxyCoordClient(cli ProxyCoordClient)
func (node *NodeImpl) SetQueryCoordClient(cli QueryCoordClient)
func NewProxyImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error)
Global Parameter Table
type GlobalParamsTable struct {
paramtable.BaseTable
NetworkPort int
IP string
NetworkAddress string
MasterAddress string
PulsarAddress string
RocksmqPath string
RocksmqRetentionTimeInMinutes int64
RocksmqRetentionSizeInMB int64
ProxyID UniqueID
TimeTickInterval time.Duration
InsertChannelNames []string
DeleteChannelNames []string
K2SChannelNames []string
SearchChannelNames []string
SearchResultChannelNames []string
ProxySubName string
ProxyTimeTickChannelNames []string
DataDefinitionChannelNames []string
MsgStreamTimeTickBufSize int64
MaxNameLength int64
MaxFieldNum int64
MaxDimension int64
DefaultPartitionName string
DefaultIndexName string
}
var Params ParamTable
5.4 Task
type task interface {
TraceCtx() context.Context
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
Name() string
Type() commonpb.MsgType
BeginTs() Timestamp
EndTs() Timestamp
SetTs(ts Timestamp)
OnEnqueue() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
}
5.5 Task Scheduler
- Base Task Queue
type TaskQueue interface {
utChan() <-chan int
UTEmpty() bool
utFull() bool
addUnissuedTask(t task) error
FrontUnissuedTask() task
PopUnissuedTask() task
AddActiveTask(t task)
PopActiveTask(ts Timestamp) task
getTaskByReqID(reqID UniqueID) task
TaskDoneTest(ts Timestamp) bool
Enqueue(t task) error
}
type baseTaskQueue struct {
unissuedTasks *list.List
activeTasks map[Timestamp]task
utLock sync.Mutex
atLock sync.Mutex
maxTaskNum int64
utBufChan chan int
sched *TaskScheduler
}
AddUnissuedTask(task *task) will push a new task into unissuedTasks, while maintaining the list by timestamp order.
TaskDoneTest(ts Timestamp) will check both unissuedTasks and unissuedTasks. If no task found before ts, then the function returns true, indicates that all the tasks before ts are completed.
- Data Definition Task Queue
type ddTaskQueue struct {
baseTaskQueue
lock sync.Mutex
}
func (queue *ddTaskQueue) Enqueue(task *task) error
func newDdTaskQueue() *ddTaskQueue
Data definition tasks (i.e. CreateCollectionTask) will be pushed into DdTaskQueue. If a task is enqueued, Enqueue(task *task) will set Ts, ReqId, ProxyId, then push it into queue. The timestamps of the enqueued tasks should be strictly monotonically increasing. As Enqueue(task *task) will be called in parallel, setting timestamp and queue insertion need to be done atomically.
- Data Manipulation Task Queue
type dmTaskQueue struct {
baseTaskQueue
}
func (queue *dmTaskQueue) Enqueue(task *task) error
func newDmTaskQueue() *dmTaskQueue
Insert tasks and delete tasks will be pushed into DmTaskQueue.
If a insertTask is enqueued, Enqueue(task *task) will set Ts, ReqId, ProxyId, SegIdAssigner, RowIdAllocator, then push it into queue. The SegIdAssigner and RowIdAllocator will later be used in the task's execution phase.
- Data Query Task Queue
type dqTaskQueue struct {
baseTaskQueue
}
func (queue *dqTaskQueue) Enqueue(task *task) error
func newDqTaskQueue() *dqTaskQueue
Queries will be pushed into DqTaskQueue.
- Task Scheduler
type taskScheduler struct {
DdQueue TaskQueue
DmQueue TaskQueue
DqQueue TaskQueue
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
msFactory msgstream.Factory
}
func (sched *taskScheduler) scheduleDdTask() *task
func (sched *taskScheduler) scheduleDmTask() *task
func (sched *taskScheduler) scheduleDqTask() *task
func (sched *TaskScheduler) getTaskByReqID(collMeta UniqueID) task
func (sched *TaskScheduler) processTask(t task, q TaskQueue)
func (sched *taskScheduler) Start() error
func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool
func NewTaskScheduler(ctx context.Context, idAllocator *allocator.IDAllocator, tsoAllocator *allocator.TimestampAllocator,
factory msgstream.Factory) (*TaskScheduler, error)
scheduleDdTask() selects tasks in a FIFO manner, thus time order is garanteed.
The policy of scheduleDmTask() should target on throughput, not tasks' time order. Note that the time order of the tasks' execution will later be garanteed by the timestamp & time tick mechanism.
The policy of scheduleDqTask() should target on throughput. It should also take visibility into consideration. For example, if an insert task and a query arrive in a same time tick and the query comes after insert, the query should be scheduled in the next tick thus the query can see the insert.
TaskDoneTest(ts Timestamp) will check all the three task queues. If no task found before ts, then the function returns true, indicates that all the tasks before ts are completed.
- Statistics
// TODO
// ActiveComponent interfaces
func (sched *taskScheduler) Id() String
func (sched *taskScheduler) Status() Status
func (sched *taskScheduler) Clean() Status
func (sched *taskScheduler) Restart() Status
func (sched *taskScheduler) heartbeat()
// protobuf
message taskSchedulerHeartbeat {
string id
uint64 dd_queue_length
uint64 dm_queue_length
uint64 dq_queue_length
uint64 num_dd_done
uint64 num_dm_done
uint64 num_dq_done
}
// TODO
5.6 Time Tick
- Time Tick
type timeTick struct {
lastTick Timestamp
currentTick Timestamp
wallTick Timestamp
tickStep Timestamp
syncInterval Timestamp
tsAllocator *TimestampAllocator
scheduler *taskScheduler
ttStream *MessageStream
ctx context.Context
}
func (tt *timeTick) Start() error
func (tt *timeTick) synchronize() error
func newTimeTick(ctx context.Context, tickStep Timestamp, syncInterval Timestamp, tsAllocator *TimestampAllocator, scheduler *taskScheduler, ttStream *MessageStream) *timeTick
Start() will enter a loop. On each tickStep, it tries to send a TIME_TICK typed TsMsg into ttStream. After each syncInterval, it synchronizes its wallTick with tsAllocator by calling synchronize(). When currentTick + tickStep < wallTick holds, it will update currentTick with wallTick on next tick. Otherwise, it will update currentTick with currentTick + tickStep.
- Statistics
// ActiveComponent interfaces
func (tt *timeTick) ID() String
func (tt *timeTick) Status() Status
func (tt *timeTick) Clean() Status
func (tt *timeTick) Restart() Status
func (tt *timeTick) heartbeat()
// protobuf
message TimeTickHeartbeat {
string id
uint64 last_tick
}