Change doc

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-03-04 10:35:28 +08:00 committed by yefu.chen
parent 6df8aa8654
commit 3ba0dc2f44
20 changed files with 1240 additions and 838 deletions

View File

@ -2,6 +2,7 @@
## Appendix A. Basic Components
// TODO
#### A.1 Watchdog
``` go
@ -72,6 +73,7 @@ etcd:
#### A.4 Time Ticked Flow Graph
//TODO
###### A.4.1 Flow Graph States
```go
@ -87,16 +89,9 @@ type flowGraphStates struct {
```go
type Msg interface {
TimeTick() Timestamp
SkipThisTick() bool
DownStreamNodeIdx() int32
}
```
```go
type SkipTickMsg struct {}
func (msg *SkipTickMsg) SkipThisTick() bool // always return true
```
###### A.4.3 Node
```go
@ -104,8 +99,8 @@ type Node interface {
Name() string
MaxQueueLength() int32
MaxParallelism() int32
SetPipelineStates(states *flowGraphStates)
Operate([]*Msg) []*Msg
Operate(ctx context.Context, in []Msg) ([]Msg, context.Context)
IsInputNode() bool
}
```
@ -113,28 +108,28 @@ type Node interface {
```go
type baseNode struct {
Name string
maxQueueLength int32
maxParallelism int32
graphStates *flowGraphStates
}
func (node *baseNode) MaxQueueLength() int32
func (node *baseNode) MaxParallelism() int32
func (node *baseNode) SetMaxQueueLength(n int32)
func (node *baseNode) SetMaxParallelism(n int32)
func (node *baseNode) SetPipelineStates(states *flowGraphStates)
func (node *BaseNode) IsInputNode() bool
```
###### A.4.4 Flow Graph
```go
type nodeCtx struct {
node *Node
inputChans [](*chan *Msg)
outputChans [](*chan *Msg)
inputMsgs [](*Msg List)
downstreams []*nodeCtx
skippedTick Timestamp
node Node
inputChannels []chan *MsgWithCtx
inputMessages []Msg
downstream []*nodeCtx
downstreamInputChanIdx map[string]int
NumActiveTasks int64
NumCompletedTasks int64
}
func (nodeCtx *nodeCtx) Start(ctx context.Context) error
@ -144,11 +139,11 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context) error
```go
type TimeTickedFlowGraph struct {
states *flowGraphStates
nodeCtx map[string]*nodeCtx
ctx context.Context
nodeCtx map[NodeName]*nodeCtx
}
func (*pipeline TimeTickedFlowGraph) AddNode(node *Node)
func (*pipeline TimeTickedFlowGraph) AddNode(node Node)
func (*pipeline TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []string)
func (*pipeline TimeTickedFlowGraph) Start() error
func (*pipeline TimeTickedFlowGraph) Close() error
@ -161,14 +156,31 @@ func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph
#### A.5 ID Allocator
```go
type IdAllocator struct {
type IDAllocator struct {
Allocator
masterAddress string
masterConn *grpc.ClientConn
masterClient masterpb.MasterServiceClient
countPerRPC uint32
idStart UniqueID
idEnd UniqueID
PeerID UniqueID
}
func (allocator *IdAllocator) Start() error
func (allocator *IdAllocator) Close() error
func (allocator *IdAllocator) Alloc(count uint32) ([]int64, error)
func (ia *IDAllocator) Start() error
func (ia *IDAllocator) connectMaster() error
func (ia *IDAllocator) syncID() bool
func (ia *IDAllocator) checkSyncFunc(timeout bool) bool
func (ia *IDAllocator) pickCanDoFunc()
func (ia *IDAllocator) processFunc(req Request) error
func (ia *IDAllocator) AllocOne() (UniqueID, error)
func (ia *IDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error)
func NewIdAllocator(ctx context.Context) *IdAllocator
func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error)
```
@ -217,17 +229,22 @@ type Timestamp uint64
```go
type timestampOracle struct {
client *etcd.Client // client of a reliable meta service, i.e. etcd client
rootPath string // this timestampOracle's working root path on the reliable kv service
saveInterval uint64
lastSavedTime uint64
tso Timestamp // monotonically increasing timestamp
key string
kvBase kv.TxnBase
saveInterval time.Duration
maxResetTSGap func() time.Duration
TSO unsafe.Pointer
lastSavedTime atomic.Value
}
func (tso *timestampOracle) GetTimestamp(count uint32) ([]Timestamp, error)
func (tso *timestampOracle) saveTimestamp() error
func (tso *timestampOracle) loadTimestamp() error
func (t *timestampOracle) InitTimestamp() error
func (t *timestampOracle) ResetUserTimestamp(tso uint64) error
func (t *timestampOracle) saveTimestamp(ts time.time) error
func (t *timestampOracle) loadTimestamp() (time.time, error)
func (t *timestampOracle) UpdateTimestamp() error
func (t *timestampOracle) ResetTimestamp()
```
@ -235,13 +252,30 @@ func (tso *timestampOracle) loadTimestamp() error
###### A.6.3 Timestamp Allocator
```go
type TimestampAllocator struct {}
type TimestampAllocator struct {
Allocator
masterAddress string
masterConn *grpc.ClientConn
masterClient masterpb.MasterServiceClient
countPerRPC uint32
lastTsBegin Timestamp
lastTsEnd Timestamp
PeerID UniqueID
}
func (allocator *TimestampAllocator) Start() error
func (allocator *TimestampAllocator) Close() error
func (allocator *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error)
func (ta *TimestampAllocator) Start() error
func (ta *TimestampAllocator) connectMaster() error
func (ta *TimestampAllocator) syncID() bool
func (ta *TimestampAllocator) checkSyncFunc(timeout bool) bool
func (ta *TimestampAllocator) pickCanDoFunc()
func (ta *TimestampAllocator) processFunc(req Request) error
func (ta *TimestampAllocator) AllocOne() (UniqueID, error)
func (ta *TimestampAllocator) Alloc(count uint32) (UniqueID, UniqueID, error)
func (ta *TimestampAllocator) ClearCache()
func NewTimestampAllocator() *TimestampAllocator
func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAllocator, error)
```
@ -259,29 +293,31 @@ func NewTimestampAllocator() *TimestampAllocator
###### A.7.1 KV Base
```go
type KVBase interface {
Load(key string) (string, error)
MultiLoad(keys []string) ([]string, error)
Save(key, value string) error
MultiSave(kvs map[string]string) error
Remove(key string) error
MultiRemove(keys []string) error
MultiSaveAndRemove(saves map[string]string, removals []string) error
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
LoadWithPrefix(key string) ( []string, []string, error)
type Base interface {
Load(key string) (string, error)
MultiLoad(keys []string) ([]string, error)
LoadWithPrefix(key string) ([]string, []string, error)
Save(key, value string) error
MultiSave(kvs map[string]string) error
Remove(key string) error
MultiRemove(keys []string) error
Close()
}
```
* *MultiLoad(keys []string)* Load multiple kv pairs. Loads are done transactional.
* *MultiSave(kvs map[string]string)* Save multiple kv pairs. Saves are done transactional.
* *MultiRemove(keys []string)* Remove multiple kv pairs. Removals are done transactional.
###### A.7.2 Txn Base
```go
type TxnBase interface {
Base
MultiSaveAndRemove(saves map[string]string, removals []string) error
}
```
###### A.7.2 Etcd KV
###### A.7.3 Etcd KV
```go
type EtcdKV struct {
@ -289,8 +325,23 @@ type EtcdKV struct {
rootPath string
}
func (kv *EtcdKV) Close()
func (kv *EtcdKV) GetPath(key string) string
func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error)
func (kv *EtcdKV) Load(key string) (string, error)
func (kv *EtcdKV) GetCount(key string) (int64, error)
func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error)
func (kv *EtcdKV) Save(key, value string) error
func (kv *EtcdKV) MultiSave(kvs map[string]string) error
func (kv *EtcdKV) RemoveWithPrefix(prefix string) error
func (kv *EtcdKV) Remove(key string) error
func (kv *EtcdKV) MultiRemove(keys []string) error
func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error
func (kv *EtcdKV) Watch(key string) clientv3.WatchChan
func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan
func NewEtcdKV(etcdAddr string, rootPath string) *EtcdKV
```
EtcdKV implements all *KVBase* interfaces.
EtcdKV implements all *TxnBase* interfaces.

View File

@ -77,39 +77,46 @@ In order to boost throughput, we model Milvus as a stream-driven system.
#### 1.6 System Model
```go
type Component interface {
type Service interface {
Init() error
Start() error
Stop() error
GetComponentStates() (ComponentStates, error)
GetTimeTickChannel() (string, error)
GetStatisticsChannel() (string, error)
}
```
```go
type Component interface {
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error)
GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error)
}
```
* *State*
* *GetComponentStates*
```go
type StateCode = int
const (
INITIALIZING StateCode = 0
HEALTHY StateCode = 1
ABNORMAL StateCode = 2
HEALTHY StateCode = 1
ABNORMAL StateCode = 2
)
type ComponentInfo struct {
NodeID UniqueID
Role string
NodeID UniqueID
Role string
StateCode StateCode
ExtraInfo KeyValuePair
ExtraInfo []*commonpb.KeyValuePair
}
type ComponentStates struct {
States ComponentInfo
SubcomponentStates []ComponentInfo
State *ComponentInfo
SubcomponentStates []*ComponentInfo
Status *commonpb.Status
}
```

View File

@ -6,10 +6,10 @@
``` go
type CollectionSchema struct {
Name string
Name string
Description string
AutoId bool
Fields []FieldSchema
AutoId bool
Fields []*FieldSchema
}
```
@ -17,11 +17,13 @@ type CollectionSchema struct {
``` go
type FieldSchema struct {
Name string
Description string
DataType DataType
TypeParams map[string]string
IndexParams map[string]string
FieldID int64
Name string
IsPrimaryKey bool
Description string
DataType DataType
TypeParams []*commonpb.KeyValuePair
IndexParams []*commonpb.KeyValuePair
}
```
@ -31,20 +33,20 @@ type FieldSchema struct {
```protobuf
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
VECTOR_FLOAT = 101;
}
```

View File

@ -13,15 +13,12 @@
```go
type IndexService interface {
Service
RegisterNode(RegisterNodeRequest) (RegisterNodeResponse, error)
BuildIndex(BuildIndexRequest) (BuildIndexResponse, error)
GetIndexStates(IndexStatesRequest) (IndexStatesResponse, error)
GetIndexFilePaths(IndexFilePathRequest) (IndexFilePathsResponse, error)
DropIndex(DropIndexRequest) (Status, error)
GetTimeTickChannel() (StringResponse, error)
GetStatisticsChannel() (StringResponse, error)
NotifyTaskState(TaskStateNotification) error
Component
RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error)
BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error)
GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error)
GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error)
NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error)
}
```
@ -30,43 +27,53 @@ type IndexService interface {
* *RegisterNode*
```go
type RegisterNodeRequest struct {
MsgBase
Address string
type MsgBase struct {
MsgType MsgType
MsgID UniqueID
Timestamp uint64
SourceID UniqueID
}
type Address struct {
Ip string
Port int64
}
type RegisterNodeRequest struct {
Base *commonpb.MsgBase
Address *commonpb.Address
}
type InitParams struct {
NodeID UniqueID
StartParams []*commonpb.KeyValuePair
}
type RegisterNodeResponse struct {
//InitParams
InitParams *internalpb2.InitParams
Status *commonpb.Status
}
```
* *BuildIndex*
```go
type KeyValuePair struct {
Key string
Value string
}
type BuildIndexRequest struct {
DataPaths []string
TypeParams map[string]string
IndexParams map[string]string
IndexName string
IndexID UniqueID
DataPaths []string
TypeParams []*commonpb.KeyValuePair
IndexParams []*commonpb.KeyValuePair
}
type BuildIndexResponse struct {
IndexID UniqueID
}
```
```go
type BuildIndexCmd struct {
IndexID UniqueID
Req BuildIndexRequest
}
type TaskStateNotification struct {
IndexID UniqueID
IndexState IndexState
IndexFilePaths []string
FailReason string
Status *commonpb.Status
IndexBuildID UniqueID
}
```
@ -74,42 +81,62 @@ type TaskStateNotification struct {
```go
type IndexStatesRequest struct {
IndexID UniqueID
IndexBuildIDs []UniqueID
}
enum IndexState {
NONE = 0;
UNISSUED = 1;
INPROGRESS = 2;
FINISHED = 3;
NONE = 0;
UNISSUED = 1;
INPROGRESS = 2;
FINISHED = 3;
FAILED = 4;
DELETED = 5;
}
type IndexInfo struct {
State commonpb.IndexState
IndexBuildID UniqueID
IndexID UniqueID
IndexName string
Reason string
}
type IndexStatesResponse struct {
ID UniqueID
State IndexState
//EnqueueTime time.Time
//ScheduleTime time.Time
//BuildCompleteTime time.Time
Status *commonpb.Status
States []*IndexInfo
}
```
* *GetIndexFilePaths*
```go
type IndexFilePathRequest struct {
IndexID UniqueID
IndexBuildIDs []UniqueID
}
type IndexFilePathInfo struct {
Status *commonpb.Status
IndexBuildID UniqueID
IndexFilePaths []string
}
type IndexFilePathsResponse struct {
FilePaths []string
Status *commonpb.Status
FilePaths []*IndexFilePathInfo
}
```
* *DropIndex*
* *NotifyBuildIndex*
```go
type DropIndexRequest struct {
IndexID UniqueID
type BuildIndexNotification struct {
Status *commonpb.Status
IndexBuildID UniqueID
IndexFilePaths []string
NodeID UniqueID
}
```
@ -120,10 +147,39 @@ type DropIndexRequest struct {
```go
type IndexNode interface {
Service
// SetTimeTickChannel(channelName string) error
// SetStatsChannel(channelName string) error
BuildIndex(req BuildIndexCmd) error
Component
BuildIndex(ctx context.Context, req *indexpb.BuildIndexCmd) (*commonpb.Status, error)
DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error)
}
```
* *BuildIndex*
```go
type KeyValuePair struct {
Key string
Value string
}
type BuildIndexRequest struct {
IndexName string
IndexID UniqueID
DataPaths []string
TypeParams []*commonpb.KeyValuePair
IndexParams []*commonpb.KeyValuePair
}
type BuildIndexCmd struct {
IndexBuildID UniqueID
Req *BuildIndexRequest
}
```
* *DropIndex*
```go
type DropIndexRequest struct {
IndexID UniqueID
}
```

View File

@ -4,6 +4,7 @@
// TODO
#### 8.2 Message Stream Service API
```go
@ -38,7 +39,7 @@ type CreateChannelResponse struct {
```go
type DestoryChannelRequest struct {
ChannelNames []string
ChannelNames []string
}
```
@ -48,7 +49,7 @@ type DestoryChannelRequest struct {
```go
type DescribeChannelRequest struct {
ChannelNames []string
ChannelNames []string
}
type ChannelDescription struct {
@ -78,42 +79,82 @@ type DescribeChannelResponse struct {
type MsgType uint32
const {
kInsert MsgType = 400
kDelete MsgType = 401
kSearch MsgType = 500
kSearchResult MsgType = 1000
kSegStatistics MsgType = 1100
kTimeTick MsgType = 1200
kTimeSync MsgType = 1201
MsgType_kNone MsgType = 0
// Definition Requests: collection
MsgType_kCreateCollection MsgType = 100
MsgType_kDropCollection MsgType = 101
MsgType_kHasCollection MsgType = 102
MsgType_kDescribeCollection MsgType = 103
MsgType_kShowCollections MsgType = 104
MsgType_kGetSysConfigs MsgType = 105
MsgType_kLoadCollection MsgType = 106
MsgType_kReleaseCollection MsgType = 107
// Definition Requests: partition
MsgType_kCreatePartition MsgType = 200
MsgType_kDropPartition MsgType = 201
MsgType_kHasPartition MsgType = 202
MsgType_kDescribePartition MsgType = 203
MsgType_kShowPartitions MsgType = 204
MsgType_kLoadPartition MsgType = 205
MsgType_kReleasePartition MsgType = 206
// Define Requests: segment
MsgType_kShowSegment MsgType = 250
MsgType_kDescribeSegment MsgType = 251
// Definition Requests: Index
MsgType_kCreateIndex MsgType = 300
MsgType_kDescribeIndex MsgType = 301
MsgType_kDropIndex MsgType = 302
// Manipulation Requests
MsgType_kInsert MsgType = 400
MsgType_kDelete MsgType = 401
MsgType_kFlush MsgType = 402
// Query
MsgType_kSearch MsgType = 500
MsgType_kSearchResult MsgType = 501
MsgType_kGetIndexState MsgType = 502
MsgType_kGetCollectionStatistics MsgType = 503
MsgType_kGetPartitionStatistics MsgType = 504
// Data Service
MsgType_kSegmentInfo MsgType = 600
// System Control
MsgType_kTimeTick MsgType = 1200
MsgType_kQueryNodeStats MsgType = 1201
MsgType_kLoadIndex MsgType = 1202
MsgType_kRequestID MsgType = 1203
MsgType_kRequestTSO MsgType = 1204
MsgType_kAllocateSegment MsgType = 1205
MsgType_kSegmentStatistics MsgType = 1206
MsgType_kSegmentFlushDone MsgType = 1207
}
type TsMsg interface {
ID() UniqueID
SetTs(ts Timestamp)
BeginTs() Timestamp
EndTs() Timestamp
Type() MsgType
Marshal(*TsMsg) interface{}
Unmarshal(interface{}) *TsMsg
}
type MsgPosition {
type MsgPosition struct{
ChannelName string
MsgID string
TimestampFilter Timestamp
MsgID string
Timestamp uint64
}
type MsgPack struct {
BeginTs Timestamp
EndTs Timestamp
Msgs []TsMsg
StartPositions []MsgPosition
EndPositions []MsgPosition
BeginTs Timestamp
EndTs Timestamp
Msgs []TsMsg
StartPositions []*MsgPosition
endPositions []*MsgPosition
}
type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack
type TsMsg interface {
ID() UniqueID
BeginTs() Timestamp
EndTs() Timestamp
Type() MsgType
HashKeys() []uint32
Marshal(TsMsg) (MarshalType, error)
Unmarshal(MarshalType) (TsMsg, error)
Position() *MsgPosition
SetPosition(*MsgPosition)
}
type RepackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
```
@ -122,10 +163,11 @@ type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack
// Unmarshal
// Interface
type UnmarshalFunc func(interface{}) *TsMsg
type UnmarshalFunc func(interface{}) (TsMsg, error)
type UnmarshalDispatcher interface {
Unmarshal(interface{}, msgType commonpb.MsgType) (msgstream.TsMsg, error)
Unmarshal(input interface{}, msgType commonpb.MsgType) (TsMsg, error)
AddMsgTemplate(msgType commonpb.MsgType, unmarshalFunc UnmarshalFunc)
}
type UnmarshalDispatcherFactory interface {
@ -134,8 +176,9 @@ type UnmarshalDispatcherFactory interface {
// Proto & Mem Implementation
type ProtoUDFactory struct {}
func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher
func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher
// TODO
type MemUDFactory struct {}
func (mudf *MemUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher
```
@ -148,26 +191,32 @@ func (mudf *MemUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher
// Interface
type MsgStream interface {
Start()
Close()
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
Produce(*MsgPack) error
Broadcast(*MsgPack) error
Consume() *MsgPack // message can be consumed exactly once
Seek(mp *MsgPosition) error
Start()
Close()
Chan() <-chan *MsgPack
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
SetRepackFunc(repackFunc RepackFunc)
Produce(context.Context, *MsgPack) error
Broadcast(context.Context, *MsgPack) error
Consume() (*MsgPack, context.Context)
Seek(offset *MsgPosition) error
}
type MsgStreamFactory interface {
NewMsgStream() *MsgStream
NewTtMsgStream() *MsgStream
SetParams(params map[string]interface{}) error
NewMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream(ctx context.Context) (MsgStream, error)
}
//TODO
// Pulsar
type PulsarMsgStreamFactory interface {}
func (pmsf *PulsarMsgStreamFactory) NewMsgStream() *MsgStream
func (pmsf *PulsarMsgStreamFactory) NewTtMsgStream() *MsgStream
//TODO
// RockMQ
type RmqMsgStreamFactory interface {}
func (rmsf *RmqMsgStreamFactory) NewMsgStream() *MsgStream
@ -180,20 +229,30 @@ func (rmsf *RmqMsgStreamFactory) NewTtMsgStream() *MsgStream
// PulsarMsgStream
type PulsarMsgStream struct {
client *pulsar.Client
repackFunc RepackFunc
producers []*pulsar.Producer
consumers []*pulsar.Consumer
unmarshal *UnmarshalDispatcher
ctx context.Context
client pulsar.Client
producers []Producer
consumers []Consumer
consumerChannels []string
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
wait *sync.WaitGroup
streamCancel func()
pulsarBufSize int64
consumerLock *sync.Mutex
consumerReflects []reflect.SelectCase
scMap *sync.Map
}
func (ms *PulsarMsgStream) Start() error
func (ms *PulsarMsgStream) Close() error
func (ms *PulsarMsgStream) AsProducer(channels []string)
func (ms *PulsarMsgStream) AsConsumer(channels []string, subName string)
func (ms *PulsarMsgStream) Produce(msgs *MsgPack) error
func (ms *PulsarMsgStream) Broadcast(msgs *MsgPack) error
func (ms *PulsarMsgStream) Consume() (*MsgPack, error)
func (ms *PulsarMsgStream) Produce(ctx context.Context, msgs *MsgPack) error
func (ms *PulsarMsgStream) Broadcast(ctx context.Context, msgs *MsgPack) error
func (ms *PulsarMsgStream) Consume() (*MsgPack, context.Context)
func (ms *PulsarMsgStream) Seek(mp *MsgPosition) error
func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc)
@ -201,23 +260,23 @@ func NewPulsarMsgStream(ctx context.Context, pulsarAddr string, bufferSize int64
type PulsarTtMsgStream struct {
client *pulsar.Client
repackFunc RepackFunc
producers []*pulsar.Producer
consumers []*pulsar.Consumer
unmarshal *UnmarshalDispatcher
inputBuf []*TsMsg
client *pulsar.Client
repackFunc RepackFunc
producers []*pulsar.Producer
consumers []*pulsar.Consumer
unmarshal *UnmarshalDispatcher
inputBuf []*TsMsg
unsolvedBuf []*TsMsg
msgPacks []*MsgPack
msgPacks []*MsgPack
}
func (ms *PulsarTtMsgStream) Start() error
func (ms *PulsarTtMsgStream) Close() error
func (ms *PulsarTtMsgStream) AsProducer(channels []string)
func (ms *PulsarTtMsgStream) AsConsumer(channels []string, subName string)
func (ms *PulsarTtMsgStream) Produce(msgs *MsgPack) error
func (ms *PulsarTtMsgStream) Broadcast(msgs *MsgPack) error
func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick
func (ms *PulsarTtMsgStream) Produce(ctx context.Context, msgs *MsgPack) error
func (ms *PulsarTtMsgStream) Broadcast(ctx context.Context, msgs *MsgPack) error
func (ms *PulsarTtMsgStream) Consume() (*MsgPack, context.Context) //return messages in one time tick
func (ms *PulsarTtMsgStream) Seek(mp *MsgPosition) error
func (ms *PulsarTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
@ -226,42 +285,42 @@ func NewPulsarTtMsgStream(ctx context.Context, pulsarAddr string, bufferSize int
// RmqMsgStream
type RmqMsgStream struct {
client *rockermq.RocksMQ
client *rockermq.RocksMQ
repackFunc RepackFunc
producers []string
consumers []string
subName string
unmarshal *UnmarshalDispatcher
producers []string
consumers []string
subName string
unmarshal *UnmarshalDispatcher
}
func (ms *RmqMsgStream) Start() error
func (ms *RmqMsgStream) Close() error
func (ms *RmqMsgStream) AsProducer(channels []string)
func (ms *RmqMsgStream) AsConsumer(channels []string, subName string)
func (ms *RmqMsgStream) Produce(msgs *MsgPack) error
func (ms *RmqMsgStream) Broadcast(msgs *MsgPack) error
func (ms *RmqMsgStream) Consume() (*MsgPack, error)
func (ms *RmqMsgStream) Produce(ctx context.Context, msgs *MsgPack) error
func (ms *RmqMsgStream) Broadcast(ctx context.Context, msgs *MsgPack) error
func (ms *RmqMsgStream) Consume() (*MsgPack, context.Context)
func (ms *RmqMsgStream) Seek(mp *MsgPosition) error
func (ms *RmqMsgStream) SetRepackFunc(repackFunc RepackFunc)
func NewRmqMsgStream(ctx context.Context) *RmqMsgStream
type RmqTtMsgStream struct {
client *rockermq.RocksMQ
client *rockermq.RocksMQ
repackFunc RepackFunc
producers []string
consumers []string
subName string
unmarshal *UnmarshalDispatcher
producers []string
consumers []string
subName string
unmarshal *UnmarshalDispatcher
}
func (ms *RmqTtMsgStream) Start() error
func (ms *RmqTtMsgStream) Close() error
func (ms *RmqTtMsgStream) AsProducer(channels []string)
func (ms *RmqTtMsgStream) AsConsumer(channels []string, subName string)
func (ms *RmqTtMsgStream) Produce(msgs *MsgPack) error
func (ms *RmqTtMsgStream) Broadcast(msgs *MsgPack) error
func (ms *RmqTtMsgStream) Consume() (*MsgPack, error)
func (ms *RmqTtMsgStream) Produce(ctx context.Context, msgs *MsgPack) error
func (ms *RmqTtMsgStream) Broadcast(ctx conext.Context) msgs *MsgPack) error
func (ms *RmqTtMsgStream) Consume() (*MsgPack, context.Context)
func (ms *RmqTtMsgStream) Seek(mp *MsgPosition) error
func (ms *RmqTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
@ -284,7 +343,7 @@ type ProducerMessage struct {
}
type ConsumerMessage struct {
msgID UniqueID
msgID UniqueID
payload []byte
}

View File

@ -10,46 +10,62 @@
```go
type ProxyService interface {
Component
Service
RegisterLink() (RegisterLinkResponse, error)
RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error)
InvalidateCollectionMetaCache(req InvalidateCollMetaCacheRequest) (Status, error)
RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error)
RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
```
* *MsgBase*
```go
type MsgBase struct {
MsgType MsgType
MsgID UniqueID
Timestamp Timestamp
SourceID UniqueID
MsgType MsgType
MsgID UniqueID
Timestamp uint64
SourceID UniqueID
}
```
* *RegisterLink*
```go
type Address struct {
Ip string
Port int64
}
type RegisterLinkResponse struct {
Address string
Port int32
Address *commonpb.Address
Status *commonpb.Status
}
```
* *RegisterNode*
```go
type RegisterNodeRequest struct {
MsgBase
Address string
type Address struct {
Ip string
Port int64
}
type RegisterNodeRequest struct {
Base *commonpb.MsgBase
Address string
Port int64
}
type InitParams struct {
NodeID UniqueID
StartParams []*commonpb.KeyValuePair
}
type RegisterNodeResponse struct {
//InitParams
InitParams *internalpb2.InitParams
Status *commonpb.Status
}
```
@ -57,8 +73,8 @@ type RegisterNodeResponse struct {
```go
type InvalidateCollMetaCacheRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
@ -70,41 +86,39 @@ type InvalidateCollMetaCacheRequest struct {
```go
type ProxyNode interface {
Service
//SetTimeTickChannel(channelName string) error
//SetStatsChannel(channelName string) error
InvalidateCollectionMetaCache(request InvalidateCollMetaCacheRequest) (Status, error)
CreateCollection(req CreateCollectionRequest) error
DropCollection(req DropCollectionRequest) error
HasCollection(req HasCollectionRequest) (bool, error)
LoadCollection(req LoadCollectionRequest) error
ReleaseCollection(req ReleaseCollectionRequest) error
DescribeCollection(req DescribeCollectionRequest) (DescribeCollectionResponse, error)
GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
ShowCollections(req ShowCollectionRequest) (ShowCollectionResponse, error)
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
CreatePartition(req CreatePartitionRequest) error
DropPartition(req DropPartitionRequest) error
HasPartition(req HasPartitionRequest) (bool, error)
LoadPartitions(req LoadPartitonRequest) error
ReleasePartitions(req ReleasePartitionRequest) error
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse)
CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error)
DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error)
ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
CreateIndex(req CreateIndexRequest) error
DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error)
GetIndexState(IndexStateRequest) (IndexStateResponse, error)
DropIndex(DropIndexRequest) (Status, error)
Insert(req InsertRequest) (InsertResponse, error)
Search(req SearchRequest) (SearchResults, error)
Flush(req FlushRequest) error
GetDdChannel(Empty) (StringResponse, error)
GetQuerySegmentInfo(QuerySegmentInfoRequest) (QuerySegmentInfoResponse, error)
GetPersistentSegmentInfo(PersistentSegmentInfoRequest) (PersistentSegmentInfoResponse, error)
CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error)
GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error)
ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error)
DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error)
Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error)
Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error)
Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error)
GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error)
GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error)
GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error)
}
```
@ -126,8 +140,8 @@ See *Master API* for detailed definitions.
```go
type LoadCollectionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
@ -136,8 +150,8 @@ type LoadCollectionRequest struct {
```go
type ReleaseCollectionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
@ -169,11 +183,19 @@ See *Master API* for detailed definitions.
* *LoadPartitions*
```go
type CollectionSchema struct {
Name string
Description string
AutoID bool
Fields []*FieldSchema
}
type LoadPartitonRequest struct {
MsgBase
DbName string
CollectionName string
PartitionNames []string
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionIDs []UniqueID
Schema *schemapb.CollectionSchema
}
```
@ -181,8 +203,8 @@ type LoadPartitonRequest struct {
```go
type ReleasePartitionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionNames []string
}
@ -212,17 +234,18 @@ See *Master API* for detailed definitions.
```go
type InsertRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
RowData []Blob
HashKeys []uint32
PartitionName string
RowData []Blob
HashKeys []uint32
}
type InsertResponse struct {
RowIDBegin UniqueID
RowIDEnd UniqueID
Status *commonpb.Status
RowIDBegin int64
RowIDEnd int64
}
```
@ -230,21 +253,26 @@ type InsertResponse struct {
```go
type SearchRequest struct {
MsgBase
DbName string
CollectionName string
PartitionNames []string
Dsl string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionNames []string
Dsl string
PlaceholderGroup []byte
}
type SearchResults struct {
Status commonpb.Status
Hits byte
}
```
* *Flush*
```go
type FlushRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
@ -254,69 +282,129 @@ type FlushRequest struct {
```go
type PersistentSegmentInfoRequest struct{
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
```go
type SegmentState int32
const (
SegmentState_SegmentNone SegmentState = 0
SegmentState_SegmentNotExist SegmentState = 1
SegmentState_SegmentGrowing SegmentState = 2
SegmentState_SegmentSealed SegmentState = 3
SegmentState_SegmentFlushed SegmentState = 4
)
type PersistentSegmentInfo struct {
SegmentID UniqueID
CollectionID UniqueID
PartitionID UniqueID
OpenTime Timestamp
SealedTime Timestamp
FlushedTime Timestamp
NumRows int64
MemSize int64
State SegmentState
SegmentID UniqueID
CollectionID UniqueID
PartitionID UniqueID
OpenTime Timestamp
SealedTime Timestamp
FlushedTime Timestamp
NumRows int64
MemSize int64
State SegmentState
}
```
```go
type PersistentSegmentInfoResponse struct{
infos []SegmentInfo
infos []*milvuspb.SegmentInfo
}
```
```
#### 6.1 Proxy Instance
```go
type Proxy struct {
servicepb.UnimplementedMilvusServiceServer
masterClient mpb.MasterClient
ctx context.Context
cancel func()
wg sync.WaitGroup
timeTick *timeTick
ttStream *MessageStream
scheduler *taskScheduler
tsAllocator *TimestampAllocator
ReqIdAllocator *IdAllocator
RowIdAllocator *IdAllocator
SegIdAssigner *segIdAssigner
initParams *internalpb2.InitParams
ip string
port int
stateCode internalpb2.StateCode
masterClient MasterClient
indexServiceClient IndexServiceClient
dataServiceClient DataServiceClient
proxyServiceClient ProxyServiceClient
queryServiceClient QueryServiceClient
sched *TaskScheduler
tick *timeTick
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
segAssigner *SegIDAssigner
manipulationMsgStream msgstream.MsgStream
queryMsgStream msgstream.MsgStream
msFactory msgstream.Factory
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
}
func (proxy *Proxy) Start() error
func NewProxy(ctx context.Context) *Proxy
func (node *NodeImpl) Init() error
func (node *NodeImpl) Start() error
func (node *NodeImpl) Stop() error
func (node *NodeImpl) AddStartCallback(callbacks ...func())
func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component, serviceName string) error
func (node *NodeImpl) lastTick() Timestamp
func (node *NodeImpl) AddCloseCallback(callbacks ...func())
func (node *NodeImpl) SetMasterClient(cli MasterClient)
func (node *NodeImpl) SetIndexServiceClient(cli IndexServiceClient)
func (node *NodeImpl) SetDataServiceClient(cli DataServiceClient)
func (node *NodeImpl) SetProxyServiceClient(cli ProxyServiceClient)
func (node *NodeImpl) SetQueryServiceClient(cli QueryServiceClient)
func NewProxyNodeImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error)
```
#### Global Parameter Table
```go
type GlobalParamsTable struct {
paramtable.BaseTable
NetworkPort int
IP string
NetworkAddress string
MasterAddress string
PulsarAddress string
QueryNodeNum int
QueryNodeIDList []UniqueID
ProxyID UniqueID
TimeTickInterval time.Duration
InsertChannelNames []string
DeleteChannelNames []string
K2SChannelNames []string
SearchChannelNames []string
SearchResultChannelNames []string
ProxySubName string
ProxyTimeTickChannelNames []string
DataDefinitionChannelNames []string
MsgStreamInsertBufSize int64
MsgStreamSearchBufSize int64
MsgStreamSearchResultBufSize int64
MsgStreamSearchResultPulsarBufSize int64
MsgStreamTimeTickBufSize int64
MaxNameLength int64
MaxFieldNum int64
MaxDimension int64
DefaultPartitionTag string
DefaultIndexName string
}
func (*paramTable GlobalParamsTable) ProxyId() int64
func (*paramTable GlobalParamsTable) ProxyAddress() string
func (*paramTable GlobalParamsTable) MasterAddress() string
func (*paramTable GlobalParamsTable) PulsarAddress() string
func (*paramTable GlobalParamsTable) TimeTickTopic() string
func (*paramTable GlobalParamsTable) InsertTopics() []string
func (*paramTable GlobalParamsTable) QueryTopic() string
func (*paramTable GlobalParamsTable) QueryResultTopics() []string
func (*paramTable GlobalParamsTable) Init() error
var ProxyParamTable GlobalParamsTable
var Params ParamTable
```
@ -328,67 +416,45 @@ var ProxyParamTable GlobalParamsTable
``` go
type task interface {
Id() int64 // return ReqId
PreExecute() error
Execute() error
PostExecute() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify() error
}
```
* Base Task
```go
type baseTask struct {
Type ReqType
ReqId int64
Ts Timestamp
ProxyId int64
}
func (task *baseTask) PreExecute() error
func (task *baseTask) Execute() error
func (task *baseTask) PostExecute() error
func (task *baseTask) WaitToFinish() error
func (task *baseTask) Notify() error
```
* Insert Task
Take insertTask as an example:
```go
type insertTask struct {
baseTask
SegIdAssigner *segIdAssigner
RowIdAllocator *IdAllocator
rowBatch *RowBatch
}
func (task *InsertTask) Execute() error
func (task *InsertTask) WaitToFinish() error
func (task *InsertTask) Notify() error
```
#### 6.2 Task Scheduler
* Base Task Queue
```go
type baseTaskQueue struct {
unissuedTasks *List
activeTasks map[int64]*task
utLock sync.Mutex // lock for UnissuedTasks
atLock sync.Mutex // lock for ActiveTasks
type TaskQueue interface {
utChan() <-chan int
UTEmpty() bool
utFull() bool
addUnissuedTask(t task) error
FrontUnissuedTask() task
PopUnissuedTask() task
AddActiveTask(t task)
PopActiveTask(ts Timestamp) task
getTaskByReqID(reqID UniqueID) task
TaskDoneTest(ts Timestamp) bool
Enqueue(t task) error
}
type baseTaskQueue struct {
unissuedTasks *list.List
activeTasks map[Timestamp]task
utLock sync.Mutex
atLock sync.Mutex
maxTaskNum int64
utBufChan chan int
sched *TaskScheduler
}
func (queue *baseTaskQueue) AddUnissuedTask(task *task)
func (queue *baseTaskQueue) FrontUnissuedTask() *task
func (queue *baseTaskQueue) PopUnissuedTask(id int64) *task
func (queue *baseTaskQueue) AddActiveTask(task *task)
func (queue *baseTaskQueue) PopActiveTask(id int64) *task
func (queue *baseTaskQueue) TaskDoneTest(ts Timestamp) bool
```
*AddUnissuedTask(task \*task)* will put a new task into *unissuedTasks*, while maintaining the list by timestamp order.
@ -449,22 +515,31 @@ Queries will be put into *DqTaskQueue*.
``` go
type taskScheduler struct {
DdQueue *ddTaskQueue
DmQueue *dmTaskQueue
DqQueue *dqTaskQueue
DdQueue TaskQueue
DmQueue TaskQueue
DqQueue TaskQueue
tsAllocator *TimestampAllocator
ReqIdAllocator *IdAllocator
idAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
msFactory msgstream.Factory
}
func (sched *taskScheduler) scheduleDdTask() *task
func (sched *taskScheduler) scheduleDmTask() *task
func (sched *taskScheduler) scheduleDqTask() *task
func (sched *TaskScheduler) getTaskByReqID(collMeta UniqueID) task
func (sched *TaskScheduler) processTask(t task, q TaskQueue)
func (sched *taskScheduler) Start() error
func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool
func newTaskScheduler(ctx context.Context, tsAllocator *TimestampAllocator, ReqIdAllocator *IdAllocator) *taskScheduler
func NewTaskScheduler(ctx context.Context, idAllocator *allocator.IDAllocator, tsoAllocator *allocator.TimestampAllocator,
factory msgstream.Factory) (*TaskScheduler, error)
```
*scheduleDdTask()* selects tasks in a FIFO manner, thus time order is garanteed.
@ -479,6 +554,7 @@ The policy of *scheduleDqTask()* should target on throughput. It should also tak
* Statistics
// TODO
```go
// ActiveComponent interfaces
func (sched *taskScheduler) Id() String
@ -501,6 +577,7 @@ message taskSchedulerHeartbeat {
// TODO
#### 6.3 Time Tick
* Time Tick

View File

@ -9,29 +9,40 @@
```go
type Master interface {
Service
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
CreateCollection(req CreateCollectionRequest) error
DropCollection(req DropCollectionRequest) error
HasCollection(req HasCollectionRequest) (bool, error)
DescribeCollection(req DescribeCollectionRequest) (DescribeCollectionResponse, error)
ShowCollections(req ShowCollectionRequest) (ShowCollectionResponse, error)
//DDL request
CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreatePartition(req CreatePartitionRequest) error
DropPartition(req DropPartitionRequest) error
HasPartition(req HasPartitionRequest) (bool, error)
ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse, error)
//index builder service
CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error)
DescribeSegment(req DescribeSegmentRequest) (DescribeSegmentResponse, error)
ShowSegments(req ShowSegmentRequest) (ShowSegmentResponse, error)
//global timestamp allocator
AllocTimestamp(ctx context.Context, in *masterpb.TsoRequest) (*masterpb.TsoResponse, error)
AllocID(ctx context.Context, in *masterpb.IDRequest) (*masterpb.IDResponse, error)
CreateIndex(req CreateIndexRequest) error
DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error)
DropIndex(DropIndexRequest) (Status, error)
//receiver time tick from proxy service, and put it into this channel
GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error)
AllocTimestamp(req TsoRequest) (TsoResponse, error)
AllocID(req IDRequest) (IDResponse, error)
//receive ddl from rpc and time tick from proxy service, and put them into this channel
GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error)
GetDdChannel() (string, error)
//just define a channel, not used currently
GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error)
//segment
DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
}
```
@ -41,10 +52,10 @@ type Master interface {
```go
type MsgBase struct {
MsgType MsgType
MsgID UniqueID
MsgType MsgType
MsgID UniqueID
Timestamp Timestamp
SourceID UniqueID
SourceID UniqueID
}
```
@ -52,10 +63,10 @@ type MsgBase struct {
```go
type CreateCollectionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
Schema []bytes
Schema []byte
}
```
@ -63,8 +74,8 @@ type CreateCollectionRequest struct {
```go
type DropCollectionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
@ -73,8 +84,8 @@ type DropCollectionRequest struct {
```go
type HasCollectionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
}
```
@ -83,17 +94,23 @@ type HasCollectionRequest struct {
```go
type DescribeCollectionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
CollectionID UniqueID
}
type CollectionSchema struct {
Name string
Description string
AutoID bool
Fields []*FieldSchema
}
type DescribeCollectionResponse struct {
DbID UniqueID
CollectionID UniqueID
DefaultPartitionName string
DefaultPartitionID UniqueID
Schema []bytes
Status *commonpb.Status
Schema *schemapb.CollectionSchema
CollectionID int64
}
```
@ -101,13 +118,13 @@ type DescribeCollectionResponse struct {
```go
type ShowCollectionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
}
type ShowCollectionResponse struct {
Status *commonpb.Status
CollectionNames []string
CollectionIDs []UniqueID
}
```
@ -115,10 +132,10 @@ type ShowCollectionResponse struct {
```go
type CreatePartitionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
PartitionName string
}
```
@ -126,10 +143,10 @@ type CreatePartitionRequest struct {
```go
type DropPartitionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
PartitionName string
}
```
@ -137,10 +154,10 @@ type DropPartitionRequest struct {
```go
type HasPartitionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
PartitionName string
}
```
@ -148,14 +165,16 @@ type HasPartitionRequest struct {
```go
type ShowPartitionRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
CollectionID UniqueID
}
type ShowPartitionResponse struct {
PartitionIDs []UniqueID
Status *commonpb.Status
PartitionNames []string
PartitionIDs []UniqueID
}
```
@ -163,13 +182,15 @@ type ShowPartitionResponse struct {
```go
type DescribeSegmentRequest struct {
MsgBase
Base *commonpb.MsgBase
CollectionID UniqueID
SegmentID UniqueID
SegmentID UniqueID
}
type DescribeSegmentResponse struct {
Status *commonpb.Status
IndexID UniqueID
BuildID UniqueID
}
```
@ -177,12 +198,13 @@ type DescribeSegmentResponse struct {
```go
type ShowSegmentRequest struct {
MsgBase
Base *commonpb.MsgBase
CollectionID UniqueID
PartitionID UniqueID
PartitionID UniqueID
}
type ShowSegmentResponse struct {
Status *commonpb.Status
SegmentIDs []UniqueID
}
```
@ -191,11 +213,11 @@ type ShowSegmentResponse struct {
```go
type CreateIndexRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
FieldName string
Params [] KeyValuePair
FieldName string
ExtraParams []*commonpb.KeyValuePair
}
```
@ -203,19 +225,22 @@ type CreateIndexRequest struct {
```go
type DescribeIndexRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
FieldName string
FieldName string
IndexName string
}
type IndexDescription struct {
IndexName string
params []KeyValuePair
IndexID UniqueID
params []*commonpb.KeyValuePair
}
type DescribeIndexResponse struct {
IndexDescriptions []IndexDescription
Status *commonpb.Status
IndexDescriptions []*IndexDescription
}
```
@ -223,11 +248,11 @@ type DescribeIndexResponse struct {
```go
type DropIndexRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
FieldName string
IndexName string
FieldName string
IndexName string
}
```
@ -235,13 +260,14 @@ type DropIndexRequest struct {
```go
type TsoRequest struct {
MsgBase
Base *commonpb.MsgBase
Count uint32
}
type TsoResponse struct {
StartTime Timestamp
Count uint32
Status *commonpb.Status
Timestamp uint64
Count uint32
}
```
@ -249,13 +275,14 @@ type TsoResponse struct {
```go
type IDRequest struct {
MsgBase
Base *commonpb.MsgBase
Count uint32
}
type IDResponse struct {
StartID UniqueID
Count uint32
Status *commonpb.Status
ID UniqueID
Count uint32
}
```
@ -267,12 +294,13 @@ type IDResponse struct {
```go
type CreateCollectionRequest struct {
RequestBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
DbID UniqueID
DbID UniqueID
CollectionID UniqueID
Schema []bytes
Schema []byte
}
```
@ -280,11 +308,11 @@ type CreateCollectionRequest struct {
```go
type DropCollectionRequest struct {
RequestBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
DbID UniqueID
CollectionID UniqueID
DbID UniqueID
CollectionID UniqueID
}
```
@ -292,13 +320,13 @@ type DropCollectionRequest struct {
```go
type CreatePartitionRequest struct {
RequestBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
PartitionName string
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
}
```
@ -306,13 +334,13 @@ type CreatePartitionRequest struct {
```go
type DropPartitionRequest struct {
RequestBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
PartitionName string
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
}
```
@ -320,14 +348,14 @@ type DropPartitionRequest struct {
```go
type CreateIndexRequest struct {
RequestBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
FieldName string
DbID UniqueID
CollectionID UniqueID
FieldID int64
Params [] KeyValuePair
FieldName string
DbID UniqueID
CollectionID UniqueID
FieldID UniqueID
ExtraParams []*commonpb.KeyValuePair
}
```
@ -337,27 +365,78 @@ type CreateIndexRequest struct {
```go
type Master interface {
tso timestampOracle // timestamp oracle
ddScheduler ddRequestScheduler // data definition request scheduler
metaTable metaTable // in-memory system meta
collManager collectionManager // collection & partition manager
segManager segmentManager // segment manager
MetaTable *metaTable
//id allocator
idAllocator *allocator.GlobalIDAllocator
//tso allocator
tsoAllocator *tso.GlobalTSOAllocator
//inner members
ctx context.Context
cancel context.CancelFunc
etcdCli *clientv3.Client
kvBase *etcdkv.EtcdKV
metaKV *etcdkv.EtcdKV
//setMsgStreams, receive time tick from proxy service time tick channel
ProxyTimeTickChan chan typeutil.Timestamp
//setMsgStreams, send time tick into dd channel and time tick channel
SendTimeTick func(t typeutil.Timestamp) error
//setMsgStreams, send create collection into dd channel
DdCreateCollectionReq func(req *internalpb2.CreateCollectionRequest) error
//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
DdDropCollectionReq func(req *internalpb2.DropCollectionRequest) error
//setMsgStreams, send create partition into dd channel
DdCreatePartitionReq func(req *internalpb2.CreatePartitionRequest) error
//setMsgStreams, send drop partition into dd channel
DdDropPartitionReq func(req *internalpb2.DropPartitionRequest) error
//setMsgStreams segment channel, receive segment info from data service, if master create segment
DataServiceSegmentChan chan *datapb.SegmentInfo
//setMsgStreams ,if segment flush completed, data node would put segment id into msg stream
DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID
//get binlog file path from data service,
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
//call index builder's client to build index, return build id
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
DropIndexReq func(indexID typeutil.UniqueID) error
//proxy service interface, notify proxy service to drop collection
InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error
//query service interface, notify query service to release collection
ReleaseCollection func(ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
// put create index task into this chan
indexTaskQueue chan *CreateIndexTask
//dd request scheduler
ddReqQueue chan reqTask //dd request will be push into this chan
lastDdTimeStamp typeutil.Timestamp
//time tick loop
lastTimeTick typeutil.Timestamp
//states code
stateCode atomic.Value
//call once
initOnce sync.Once
startOnce sync.Once
//isInit atomic.Value
msFactory ms.Factory
}
```
* Timestamp allocation
Master serves as a centrol clock of the whole system. Other components (i.e. Proxy) allocates timestamps from master via RPC *AllocTimestamp*. All the timestamp allocation requests will be handled by the timestampOracle singleton. See section 4.2 for the details about timestampOracle.
* Request Scheduling
* System Meta
* Collection Management
* Segment Management
#### 10.3 Data definition Request Scheduler
@ -367,11 +446,12 @@ Master receives data definition requests via grpc. Each request (described by a
```go
type task interface {
Type() ReqType
Ts() Timestamp
Type() commonpb.MsgType
Ts() (typeutil.Timestamp, error)
IgnoreTimeStamp() bool
Execute() error
WaitToFinish() error
Notify() error
Notify(err error)
}
```
@ -388,6 +468,7 @@ type createCollectionTask struct {
// Task interfaces
func (task *createCollectionTask) Type() ReqType
func (task *createCollectionTask) Ts() Timestamp
func (task *createCollectionTask) IgnoreTimeStamp() bool
func (task *createCollectionTask) Execute() error
func (task *createCollectionTask) Notify() error
func (task *createCollectionTask) WaitToFinish() error
@ -395,6 +476,7 @@ func (task *createCollectionTask) WaitToFinish() error
// TODO
###### 10.2.3 Scheduler
```go
@ -418,6 +500,7 @@ Master
//TODO
#### 10.4 Meta Table
###### 10.4.1 Meta
@ -491,39 +574,46 @@ Note that *tenantId*, *proxyId*, *collectionId*, *segmentId* are unique strings
```go
type metaTable struct {
kv kv.TxnBase // client of a reliable kv service, i.e. etcd client
tenantId2Meta map[UniqueId]TenantMeta // tenant id to tenant meta
proxyId2Meta map[UniqueId]ProxyMeta // proxy id to proxy meta
collId2Meta map[UniqueId]CollectionMeta // collection id to collection meta
collName2Id map[string]UniqueId // collection name to collection id
segId2Meta map[UniqueId]SegmentMeta // segment id to segment meta
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection id to collection meta,
collName2ID map[string]typeutil.UniqueID // collection name to collection id
partitionID2Meta map[typeutil.UniqueID]pb.PartitionInfo // partition id -> partition meta
segID2IndexMeta map[typeutil.UniqueID]*map[typeutil.UniqueID]pb.SegmentIndexInfo // segment id -> index id -> segment index meta
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // index id ->index meta
segID2CollID map[typeutil.UniqueID]typeutil.UniqueID // segment id -> collection id
partitionID2CollID map[typeutil.UniqueID]typeutil.UniqueID // partition id -> collection id
tenantLock sync.RWMutex
proxyLock sync.RWMutex
ddLock sync.RWMutex
proxyLock sync.RWMutex
ddLock sync.RWMutex
}
func (meta *metaTable) AddTenant(tenant *TenantMeta) error
func (meta *metaTable) DeleteTenant(tenantId UniqueId) error
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, part *pb.PartitionInfo, idx []*pb.IndexInfo) error
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error
func (mt *metaTable) HasCollection(collID typeutil.UniqueID) bool
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID) (*pb.CollectionInfo, error)
func (mt *metaTable) GetCollectionByName(collectionName string) (*pb.CollectionInfo, error)
func (mt *metaTable) GetCollectionBySegmentID(segID typeutil.UniqueID) (*pb.CollectionInfo, error)
func (mt *metaTable) ListCollections() ([]string, error)
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID) error
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string) bool
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string) (typeutil.UniqueID, error)
func (mt *metaTable) GetPartitionByID(partitionID typeutil.UniqueID) (pb.PartitionInfo, error)
func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error
func (mt *metaTable) AddIndex(seg *pb.SegmentIndexInfo) error
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.UniqueID, bool, error)
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error)
func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error)
func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error)
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool
func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo) ([]typeutil.UniqueID, schemapb.FieldSchema, error)
func (mt *metaTable) GetIndexByName(collName string, fieldName string, indexName string) ([]pb.IndexInfo, error)
func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error)
func (meta *metaTable) AddProxy(proxy *ProxyMeta) error
func (meta *metaTable) DeleteProxy(proxyId UniqueId) error
func (meta *metaTable) AddCollection(coll *CollectionMeta) error
func (meta *metaTable) DeleteCollection(collId UniqueId) error
func (meta *metaTable) HasCollection(collId UniqueId) bool
func (meta *metaTable) GetCollectionByName(collName string) (*CollectionMeta, error)
func (meta *metaTable) AddPartition(collId UniqueId, tag string) error
func (meta *metaTable) HasPartition(collId UniqueId, tag string) bool
func (meta *metaTable) DeletePartition(collId UniqueId, tag string) error
func (meta *metaTable) AddSegment(seg *SegmentMeta) error
func (meta *metaTable) GetSegmentById(segId UniqueId)(*SegmentMeta, error)
func (meta *metaTable) DeleteSegment(segId UniqueId) error
func (meta *metaTable) CloseSegment(segId UniqueId, closeTs Timestamp, num_rows int64) error
func NewMetaTable(kv kv.TxnBase) (*metaTable,error)
func NewMetaTable(kv kv.TxnBase) (*metaTable, error)
```
*metaTable* maintains meta both in memory and *etcdKV*. It keeps meta's consistency in both sides. All its member functions may be called concurrently.
@ -538,6 +628,7 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable,error)
###### 10.5.1 Time Tick Barrier
//TODO
* Soft Time Tick Barrier
@ -545,18 +636,19 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable,error)
```go
type softTimeTickBarrier struct {
peer2LastTt map[UniqueId]Timestamp
peer2LastTt map[UniqueID]Timestamp
minTtInterval Timestamp
lastTt Timestamp
outTt chan Timestamp
ttStream *MsgStream
ctx context.Context
lastTt int64
outTt chan Timestamp
ttStream ms.MsgStream
ctx context.Context
}
func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp,error)
func (ttBarrier *softTimeTickBarrier) Start() error
func (ttBarrier *softTimeTickBarrier) Start()
func (ttBarrier *softTimeTickBarrier) Close()
func newSoftTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId, minTtInterval Timestamp) *softTimeTickBarrier
func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier
```
@ -567,20 +659,25 @@ func newSoftTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []
```go
type hardTimeTickBarrier struct {
peer2Tt map[UniqueId]List
outTt chan Timestamp
ttStream *MsgStream
ctx context.Context
peer2Tt map[UniqueID]Timestamp
outTt chan Timestamp
ttStream ms.MsgStream
ctx context.Context
wg sync.WaitGroup
loopCtx context.Context
loopCancel context.CancelFunc
}
func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp,error)
func (ttBarrier *hardTimeTickBarrier) Start() error
func (ttBarrier *hardTimeTickBarrier) Start()
func (ttBarrier *hardTimeTickBarrier) Close()
func newHardTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId) *softTimeTickBarrier
func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier
```
// TODO
###### 10.5.1 Time Synchronization Message Producer
<img src="./figs/time_sync_msg_producer.png" width=700>
@ -589,17 +686,16 @@ func newHardTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []
```go
type TimeTickBarrier interface {
GetTimeTick() (Timestamp,error)
Start() error
Start()
Close()
}
type timeSyncMsgProducer struct {
proxyTtBarrier TimeTickBarrier // softTimeTickBarrier
WriteNodeTtBarrier TimeTickBarrier //hardTimeTickBarrier
dmSyncStream *MsgStream // insert & delete
k2sSyncStream *MsgStream
ctx context.Context
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ttBarrier TimeTickBarrier
watchers []TimeTickWatcher
}
func (syncMsgProducer *timeSyncMsgProducer) SetProxyTtStreams(proxyTt *MsgStream, proxyIds []UniqueId)
@ -641,6 +737,7 @@ message QueryNodeStats {
//TODO
```go
type assignment struct {
MemSize int64
@ -665,6 +762,7 @@ func NewSegmentManagement(ctx context.Context) *SegmentManagement
//TODO
###### 10.7.1 Assign Segment ID to Inserted Rows
Master receives *AssignSegIDRequest* which contains a list of *SegIDRequest(count, channelName, collectionName, partitionName)* from Proxy. Segment Manager will assign the opened segments or open a new segment if there is no enough space, and Segment Manager will record the allocated space which can be reallocated after a expire duration.

View File

@ -13,20 +13,18 @@
```go
type QueryService interface {
Service
Component
RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error)
ShowCollections(req ShowCollectionRequest) (ShowCollectionResponse, error)
LoadCollection(req LoadCollectionRequest) error
ReleaseCollection(req ReleaseCollectionRequest) error
ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse, error)
GetPartitionStates(req PartitionStatesRequest) (PartitionStatesResponse, error)
LoadPartitions(req LoadPartitonRequest) error
ReleasePartitions(req ReleasePartitionRequest) error
CreateQueryChannel() (CreateQueryChannelResponse, error)
GetSegmentInfo(req SegmentInfoRequest) (SegmentInfoResponse, error)
RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error)
ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error)
LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error)
LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error)
CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error)
GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error)
GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error)
}
```
@ -46,14 +44,19 @@ type MsgBase struct {
* *RegisterNode*
```go
tyoe Address struct {
Ip string
port int64
}
type RegisterNodeRequest struct {
MsgBase
Address string
Port int64
Base *commonpb.MsgBase
Address *commonpb.Address
}
type RegisterNodeResponse struct {
//InitParams
Status *commonpb.Status
InitParams *internalpb2.InitParams
}
```
@ -61,11 +64,12 @@ type RegisterNodeResponse struct {
```go
type ShowCollectionRequest struct {
MsgBase
Base *commonpb.MsgBase
DbID UniqueID
}
type ShowCollectionResponse struct {
Status *commonpb.Status
CollectionIDs []UniqueID
}
```
@ -74,10 +78,10 @@ type ShowCollectionResponse struct {
```go
type LoadCollectionRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
schema schemapb.CollectionSchema
schema *schemapb.CollectionSchema
}
```
@ -85,8 +89,8 @@ type LoadCollectionRequest struct {
```go
type ReleaseCollectionRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
}
```
@ -95,12 +99,13 @@ type ReleaseCollectionRequest struct {
```go
type ShowPartitionRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
}
type ShowPartitionResponse struct {
Status *commonpb.Status
PartitionIDs []UniqueID
}
```
@ -111,29 +116,30 @@ type ShowPartitionResponse struct {
type PartitionState = int
const (
NOT_EXIST PartitionState = 0
NOT_PRESENT PartitionState = 1
ON_DISK PartitionState = 2
PARTIAL_IN_MEMORY PartitionState = 3
IN_MEMORY PartitionState = 4
PARTIAL_IN_GPU PartitionState = 5
IN_GPU PartitionState = 6
PartitionState_NotExist PartitionState = 0
PartitionState_NotPresent PartitionState = 1
PartitionState_OnDisk PartitionState = 2
PartitionState_PartialInMemory PartitionState = 3
PartitionState_InMemory PartitionState = 4
PartitionState_PartialInGPU PartitionState = 5
PartitionState_InGPU PartitionState = 6
)
type PartitionStatesRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionIDs []UniqueID
}
type PartitionStates struct {
PartitionID UniqueID
State PartitionState
State PartitionState
}
type PartitionStatesResponse struct {
States []PartitionStates
Status *commonpb.Status
PartitionDescriptions []*PartitionStates
}
```
@ -141,11 +147,11 @@ type PartitionStatesResponse struct {
```go
type LoadPartitonRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionIDs []UniqueID
schema schemapb.CollectionSchema
Schema *schemapb.CollectionSchema
}
```
@ -153,8 +159,8 @@ type LoadPartitonRequest struct {
```go
type ReleasePartitionRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionIDs []UniqueID
}
@ -164,34 +170,37 @@ type ReleasePartitionRequest struct {
```go
type CreateQueryChannelResponse struct {
Status *commonpb.Status
RequestChannelName string
ResultChannelName string
ResultChannelName string
}
```
* *GetSegmentInfo* *
```go
type SegmentInfo struct {
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
mem_size int64
num_rows int64
index_name string
indexID UniqueID
}
type SegmentInfoRequest struct {
MsgBase
Base *commonpb.MsgBase
SegmentIDs []UniqueID
}
type SegmentInfo struct {
SegmentID UniqueID
CollectionID UniqueID
PartitionID UniqueID
MemSize UniqueID
NumRows UniqueID
IndexName string
IndexID UniqueID
}
type SegmentInfoResponse struct {
infos []SegmentInfo
Status *commonpb.Status
Infos []*SegmentInfo
}
```
//TODO
#### 8.2 Query Channel
```go
@ -214,20 +223,16 @@ type SearchRequest struct {
```go
type QueryNode interface {
Service
typeutil.Component
AddQueryChannel(req AddQueryChannelRequest) error
RemoveQueryChannel(req RemoveQueryChannelRequest) error
WatchDmChannels(req WatchDmChannelRequest) error
//SetTimeTickChannel(channelName string) error
//SetStatsChannel(channelName string) error
LoadSegments(req LoadSegmentRequest) error
ReleaseSegments(req ReleaseSegmentRequest) error
//DescribeParition(req DescribeParitionRequest) (PartitionDescriptions, error)
ReleaseCollection(req ReleaseCollectionRequest) error
ReleasePartitions(req ReleasePartitionRequest) error
AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionRequest) (*commonpb.Status, error)
ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
GetSegmentInfo(ctx context.Context, in *queryPb.SegmentInfoRequest) (*queryPb.SegmentInfoResponse, error)
}
```
@ -237,9 +242,9 @@ type QueryNode interface {
```go
type AddQueryChannelRequest struct {
MsgBase
RequestChannelName string
ResultChannelName string
Base *commonpb.MsgBase
RequestChannelID string
ResultChannelID string
}
```
@ -247,8 +252,10 @@ type AddQueryChannelRequest struct {
```go
type RemoveQueryChannelRequest struct {
RequestChannelName string
ResultChannelName string
Status *commonpb.Status
Base *commonpb.MsgBase
RequestChannelID string
ResultChannelID string
}
```
@ -256,9 +263,8 @@ type RemoveQueryChannelRequest struct {
```go
type WatchDmChannelRequest struct {
InsertChannelNames []string
StartSegment UniqueID
//FieldIDs []int64
Base *commonpb.MsgBase
ChannelIDs []string
}
```
@ -266,12 +272,34 @@ type WatchDmChannelRequest struct {
```go
type LoadSegmentRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
SegmentIDs []UniqueID
FieldIDs []UniqueID
SegmentStates []*datapb.SegmentStateInfo
Schema *schemapb.CollectionSchema
}
```
* *ReleaseCollection*
```go
type ReleaseCollectionRequest struct {
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
SegmentIDs []UniqueID
//FieldIDs []int64
}
```
* *ReleasePartitions*
```go
type ReleasePartitionRequest struct {
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionIDs []UniqueID
}
```
@ -279,16 +307,32 @@ type LoadSegmentRequest struct {
```go
type ReleaseSegmentRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBas
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
SegmentIDs []UniqueID
PartitionIDs []UniqueID
SegmentIDs []UniqueID
FieldIDs []UniqueID
}
```
* *GetSegmentInfo*
```go
type SegmentInfoRequest struct {
Base *commonpb.MsgBase
SegmentIDs []Unique
}
type SegmentInfoResponse struct {
Status *commonpb.Status
Infos []*SegmentInfo
}
```
//TODO
#### 8.2 Collection Replica
$collectionReplica$ contains a in-memory local copy of persistent collections. In common cases, the system has multiple query nodes. Data of a collection will be distributed across all the available query nodes, and each query node's $collectionReplica$ will maintain its own share (only part of the collection).

View File

@ -12,24 +12,21 @@
```go
type DataService interface {
Service
RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error)
Flush(req FlushRequest) error
AssignSegmentID(req AssignSegIDRequest) (AssignSegIDResponse, error)
ShowSegments(req ShowSegmentRequest) (ShowSegmentResponse, error)
GetSegmentStates(req SegmentStatesRequest) (SegmentStatesResponse, error)
GetSegmentInfo(req SegmentInfoRequest) (SegmentInfoResponse, error)
GetInsertBinlogPaths(req InsertBinlogPathRequest) (InsertBinlogPathsResponse, error)
GetSegmentInfoChannel(req InsertChannelRequest) (StringResponse, error)
GetInsertChannels(req InsertChannelRequest) (StringList, error)
GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
typeutil.Service
typeutil.Component
RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error)
AssignSegmentID(ctx context.Context, req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error)
ShowSegments(ctx context.Context, req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error)
GetSegmentStates(ctx context.Context, req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
GetInsertBinlogPaths(ctx context.Context, req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error)
GetInsertChannels(ctx context.Context, req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
GetCollectionStatistics(ctx context.Context, req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
GetPartitionStatistics(ctx context.Context, req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
GetCount(ctx context.Context, req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error)
GetSegmentInfo(ctx context.Context, req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
}
```
@ -39,10 +36,10 @@ type DataService interface {
```go
type MsgBase struct {
MsgType MsgType
MsgID UniqueID
MsgType MsgType
MsgID UniqueID
Timestamp Timestamp
SourceID UniqueID
SourceID UniqueID
}
```
@ -50,13 +47,23 @@ type MsgBase struct {
```go
type RegisterNodeRequest struct {
MsgBase
Address string
Port int64
Base *commonpb.MsgBase
Address *commonpb.Address
}
type RegisterNodeResponse struct {
//InitParams
InitParams *internalpb2.InitParams
Status *commonpb.Status
}
```
* *Flush*
```go
type FlushRequest struct {
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
}
```
@ -64,56 +71,50 @@ type RegisterNodeResponse struct {
```go
type SegIDRequest struct {
Count uint32
ChannelName string
CollectionID UniqueID
PartitionID UniqueID
Count uint32
ChannelName string
CollectionID UniqueID
PartitionID UniqueID
CollName string
PartitionName string
}
type AssignSegIDRequest struct {
MsgBase
PerChannelRequest []SegIDRequest
NodeID int64
PeerRole string
SegIDRequests []*SegIDRequest
}
type SegIDAssignment struct {
SegmentID UniqueID
ChannelName string
Count uint32
CollectionID UniqueID
PartitionID UniqueID
ExpireTime Timestamp
SegID UniqueID
ChannelName string
Count uint32
CollectionID UniqueID
PartitionID UniqueID
ExpireTime uint64
Status *commonpb.Status
CollName string
PartitionName string
}
type AssignSegIDResponse struct {
PerChannelResponse []SegIDAssignment
SegIDAssignments []*SegIDAssignment
Status *commonpb.Status
}
```
* *Flush*
```go
type FlushRequest struct {
MsgBase
DbID UniqueID
CollectionID UniqueID
}
```
* *ShowSegments*
```go
type ShowSegmentRequest struct {
MsgBase
Base *commonpb.MsgBase
CollectionID UniqueID
PartitionID UniqueID
PartitionID UniqueID
}
type ShowSegmentResponse struct {
SegmentIDs []UniqueID
Status *commonpb.Status
}
```
@ -122,56 +123,32 @@ type ShowSegmentResponse struct {
* *GetSegmentStates*
```go
enum SegmentState {
NONE = 0;
NOT_EXIST = 1;
GROWING = 2;
SEALED = 3;
}
type SegmentStatesRequest struct {
MsgBase
Base *commonpb.MsgBase
SegmentID UniqueID
}
enum SegmentState {
NONE = 0;
NOT_EXIST = 1;
GROWING = 2;
SEALED = 3;
}
type SegmentStateInfo struct {
SegmentID UniqueID
State commonpb.SegmentState
CreateTime uint64
SealedTime uint64
FlushedTime uint64
StartPosition *internalpb2.MsgPosition
EndPosition *internalpb2.MsgPosition
Status *commonpb.Status
}
type SegmentStatesResponse struct {
State SegmentState
OpenTime Timestamp
SealedTime Timestamp
MsgStartPositions []msgstream.MsgPosition
MsgEndPositions []msgstream.MsgPosition
}
```
* *GetSegmentInfo*
```go
type SegmentInfoRequest struct{
MsgBase
SegmentIDs [] UniqueID
}
```
```go
type SegmentInfo struct {
SegmentID UniqueID
CollectionID UniqueID
PartitionID UniqueID
InsertChannel string
OpenTime Timestamp
SealedTime Timestamp
FlushedTime Timestamp
NumRows int64
MemSize int64
State SegmentState
StartPosition []Msgstream.MsgPosition
EndPosition []Msgstream.MsgPosition
}
```
```go
type SegmentInfoResponse struct{
infos []SegmentInfo
Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
States []*SegmentStateInfo `protobuf:"bytes,2,rep,name=states,proto3" json:"states,omitempty"`
}
```
@ -179,71 +156,122 @@ type SegmentInfoResponse struct{
```go
type InsertBinlogPathRequest struct {
MsgBase
Base *commonpb.MsgBase
SegmentID UniqueID
}
type InsertBinlogPathsResponse struct {
FieldIDToPaths map[int64][]string
FieldIDs []int64
Paths []*internalpb2.StringList
Status *commonpb.Status
}
```
* *GetInsertChannels*
```go
type InsertChannelRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
}
```
* *GetCollectionStatistics*
```go
type CollectionStatsRequest struct {
MsgBase
DbName string
CollectionName string
Base *commonpb.MsgBase
DbID int64
CollectionID int64
}
type CollectionStatsResponse struct {
Stats []KeyValuePair
Stats []*commonpb.KeyValuePair
Status *commonpb.Status
}
```
* *GetPartitionStatistics*
```go
type PartitionStatsRequest struct {
MsgBase
DbName string
CollectionName string
PartitionName string
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
}
type PartitionStatsResponse struct {
Stats []KeyValuePair
Stats []*commonpb.KeyValuePair
Status *commonpb.Status
}
```
* *GetCount*
```go
type CollectionCountRequest struct {
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
}
type CollectionCountResponse struct {
Status *commonpb.Status
Count int64
}
```
* *GetSegmentInfo*
```go
type SegmentInfoRequest struct{
Base *commonpb.MsgBase
SegmentIDs []UniqueID
}
type SegmentInfo struct {
SegmentID UniqueID
CollectionID UniqueID
PartitionID UniqueID
InsertChannel string
OpenTime Timestamp
SealedTime Timestamp
FlushedTime Timestamp
NumRows int64
MemSize int64
State SegmentState
StartPosition []*internalpb2.MsgPosition
EndPosition []*internalpb2.MsgPosition
}
type SegmentInfoResponse struct{
Status *commonpb.Status
infos []SegmentInfo
}
```
#### 8.2 Insert Channel
```go
type InsertRequest struct {
MsgBase
DbName string
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
RowData []Blob
HashKeys []uint32
PartitionName string
DbID UniqueID
CollectionID UniqueID
PartitionID UniqueID
SegmentID UniqueID
ChannelID string
Timestamps []uint64
RowIDs []int64
RowData []*commonpb.Blob
}
```
@ -254,45 +282,22 @@ type InsertRequest struct {
```go
type DataNode interface {
Service
GetComponentStates() (ComponentStates, error)
GetTimeTickChannel() (StringResponse, error)
GetStatisticsChannel() (StringResponse, error)
Component
WatchDmChannels(WatchDmChannelRequest) error
FlushSegments(FlushSegRequest) (Status, error)
//WatchDdChannel(channelName string) error
//SetTimeTickChannel(channelName string) error
//SetStatisticsChannel(channelName string) error
WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) error
SetMasterServiceInterface(MasterServiceInterface) error
SetDataServiceInterface(DataServiceInterface) error
SetMasterServiceInterface(ctx context.Context, ms MasterServiceInterface) error
SetDataServiceInterface(ctx context.Context, ds DataServiceInterface) error
}
```
```go
type DataServiceInterface interface {
GetComponentStates() (ComponentStates, error)
RegisterNode(RegisterNodeRequest) (RegisterNodeResponse, error)
}
```
```go
type MasterServiceInterface interface {
GetComponentStates() (ComponentStates, error)
AllocID(IDRequest) (IDResponse, error)
ShowCollections(ShowCollectionRequest) (ShowCollectionResponse, error)
DescribeCollection(DescribeCollectionRequest) (DescribeCollectionResponse, error)
}
```
* *WatchDmChannels*
```go
type WatchDmChannelRequest struct {
MsgBase
InsertChannelNames []string
Base *commonpb.MsgBase
ChannelNames []string
}
```
@ -300,24 +305,32 @@ type WatchDmChannelRequest struct {
```go
type FlushSegRequest struct {
MsgBase
DbID UniqueID
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
SegmentID []UniqueID
SegmentIDs []int64
}
```
#### 8.2 SegmentStatistics Update Channel
* *SegmentStatistics*
```go
type SegmentStatisticsUpdates struct {
SegmentID UniqueID
MemorySize int64
NumRows int64
SegmentID UniqueID
MemorySize int64
NumRows int64
CreateTime uint64
EndTime uint64
StartPosition *internalpb2.MsgPosition
EndPosition *internalpb2.MsgPosition
IsNewSegment bool
}
type SegmentStatistics struct{
MsgBase
Base *commonpb.MsgBase
SegStats []*SegmentStatisticsUpdates
}
```

View File

@ -26,15 +26,8 @@ const (
type (
Interface interface {
// Service
Init() error
Start() error
Stop() error
// Component
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) // This function has no effect
GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) // This function has no effect
typeutil.Service
typeutil.Component
WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) error

View File

@ -143,23 +143,23 @@ func (s *Server) Stop() error {
}
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
return s.impl.BuildIndex(req)
return s.impl.BuildIndex(ctx, req)
}
func (s *Server) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
return s.impl.DropIndex(request)
return s.impl.DropIndex(ctx, request)
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.impl.GetComponentStates()
return s.impl.GetComponentStates(ctx)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetTimeTickChannel()
return s.impl.GetTimeTickChannel(ctx)
}
func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetStatisticsChannel()
return s.impl.GetStatisticsChannel(ctx)
}
func NewServer(ctx context.Context) (*Server, error) {

View File

@ -156,21 +156,21 @@ func (s *Server) Stop() error {
}
func (s *Server) RegisterLink(ctx context.Context, empty *commonpb.Empty) (*milvuspb.RegisterLinkResponse, error) {
return s.impl.RegisterLink()
return s.impl.RegisterLink(ctx)
}
func (s *Server) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
return s.impl.RegisterNode(request)
return s.impl.RegisterNode(ctx, request)
}
func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return s.impl.InvalidateCollectionMetaCache(request)
return s.impl.InvalidateCollectionMetaCache(ctx, request)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetTimeTickChannel()
return s.impl.GetTimeTickChannel(ctx)
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.impl.GetComponentStates()
return s.impl.GetComponentStates(ctx)
}

View File

@ -158,8 +158,7 @@ func (i *NodeImpl) SetIndexServiceClient(serviceClient typeutil.IndexServiceInte
i.serviceClient = serviceClient
}
func (i *NodeImpl) BuildIndex(request *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
ctx := context.Background()
func (i *NodeImpl) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
t := &IndexBuildTask{
BaseTask: BaseTask{
ctx: ctx,
@ -185,7 +184,7 @@ func (i *NodeImpl) BuildIndex(request *indexpb.BuildIndexCmd) (*commonpb.Status,
return ret, nil
}
func (i *NodeImpl) DropIndex(request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
func (i *NodeImpl) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
i.sched.IndexBuildQueue.tryToRemoveUselessIndexBuildTask(request.IndexID)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
@ -203,7 +202,7 @@ func (i *NodeImpl) AddCloseCallback(callbacks ...func()) {
i.closeCallbacks = append(i.closeCallbacks, callbacks...)
}
func (i *NodeImpl) GetComponentStates() (*internalpb2.ComponentStates, error) {
func (i *NodeImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
stateInfo := &internalpb2.ComponentInfo{
NodeID: Params.NodeID,
@ -221,7 +220,7 @@ func (i *NodeImpl) GetComponentStates() (*internalpb2.ComponentStates, error) {
return ret, nil
}
func (i *NodeImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
func (i *NodeImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
@ -229,7 +228,7 @@ func (i *NodeImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
}, nil
}
func (i *NodeImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
func (i *NodeImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,

View File

@ -132,6 +132,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
nty.Status.ErrorCode = commonpb.ErrorCode_BUILD_INDEX_ERROR
}
ctx = context.TODO()
resp, err := it.serviceClient.NotifyBuildIndex(ctx, nty)
if err != nil {
log.Println("IndexBuildTask notify err:", err.Error())

View File

@ -112,7 +112,7 @@ type Core struct {
Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta
Segment Flush Watcher, monitor if segment has flushed into disk
IndexService Interface:
IndexService Interface
IndexService Sch, tell index service to build index
*/

View File

@ -10,6 +10,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type MasterClient interface {
@ -67,9 +68,7 @@ type ProxyServiceClient interface {
}
type ProxyNode interface {
Init() error
Start() error
Stop() error
typeutil.Service
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)

View File

@ -158,7 +158,7 @@ func (s *ServiceImpl) Stop() error {
return nil
}
func (s *ServiceImpl) GetComponentStates() (*internalpb2.ComponentStates, error) {
func (s *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
stateInfo := &internalpb2.ComponentInfo{
NodeID: UniqueID(0),
Role: "ProxyService",
@ -179,7 +179,7 @@ func (s *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) {
s.stateCode = code
}
func (s *ServiceImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
func (s *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
@ -188,13 +188,13 @@ func (s *ServiceImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
}, nil
}
func (s *ServiceImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
func (s *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
panic("implement me")
}
func (s *ServiceImpl) RegisterLink() (*milvuspb.RegisterLinkResponse, error) {
func (s *ServiceImpl) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) {
log.Println("register link")
ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval)
ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
defer cancel()
t := &RegisterLinkTask{
@ -230,9 +230,9 @@ func (s *ServiceImpl) RegisterLink() (*milvuspb.RegisterLinkResponse, error) {
return t.response, nil
}
func (s *ServiceImpl) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
func (s *ServiceImpl) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
log.Println("RegisterNode: ", request)
ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval)
ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
defer cancel()
t := &RegisterNodeTask{
@ -271,9 +271,9 @@ func (s *ServiceImpl) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxy
return t.response, nil
}
func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
func (s *ServiceImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
log.Println("InvalidateCollectionMetaCache")
ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval)
ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
defer cancel()
t := &InvalidateCollectionMetaCacheTask{

View File

@ -1,6 +1,8 @@
package proxyservice
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
@ -13,8 +15,8 @@ type Service = typeutil.Service
type ProxyService interface {
Component
Service
RegisterLink() (*milvuspb.RegisterLinkResponse, error)
RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error)
RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
// TODO: i'm sure it's not a best way to keep consistency, fix me
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}

View File

@ -92,7 +92,7 @@ func (p *ParamTable) initLogCfg() {
if err != nil {
panic(err)
}
p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("masterservice-%d.log", p.NodeID))
p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("queryService-%d.log", p.NodeID))
}
func (p *ParamTable) initStatsChannelName() {

View File

@ -22,6 +22,7 @@ type Component interface {
GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error)
}
// TODO
type IndexNodeInterface interface {
Service
Component