Signed-off-by: tumao <yan.wang@zilliz.com>
8.9 KiB
Create Index
Index system
is the core part of Milvus
, which is used to speed up the searches, this document indroduces which components are involved in Create Index
,and what do these components do?
The execution flow of Create Index
is shown in the following figure:
- Firstly,
SDK
starts aCreateIndex
request toProxy
viaGrpc
, theproto
is defined as follows:
service MilvusService {
...
rpc CreateIndex(CreateIndexRequest) returns (common.Status) {}
...
}
message CreateIndexRequest {
common.MsgBase base = 1; // must
string db_name = 2;
string collection_name = 3; // must
string field_name = 4; // must
repeated common.KeyValuePair extra_params = 5; // must
}
- When received the
CreateIndex
request, theProxy
would wraps this request intoCreateIndexTask
, and pushs this task intoDdTaskQueue
queue. After that,Proxy
would call method ofWatiToFinish
to wait until the task finished.
type task interface {
TraceCtx() context.Context
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
Name() string
Type() commonpb.MsgType
BeginTs() Timestamp
EndTs() Timestamp
SetTs(ts Timestamp)
OnEnqueue() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
}
type createIndexTask struct {
Condition
*milvuspb.CreateIndexRequest
ctx context.Context
rootCoord types.RootCoord
result *commonpb.Status
}
-
There is a backgroud service in
Proxy
, this service would get theCreateIndexTask
fromDdTaskQueue
, and executes it in three phases.PreExecute
, do some static checking at this phase, such as check if the index param is legal, etc.Execute
, at this phase,Proxy
would sendCreateIndex
request toRootCoord
viaGrpc
,and wait the reponse, theproto
is defined as follow:
service RootCoord { ... rpc CreateIndex(milvus.CreateIndexRequest) returns (common.Status) {} ... }
PostExecute
,CreateIndexTask
does nothing at this phase, and return directly.
-
RootCoord
would wraps theCreateIndex
request intoCreateIndexReqTask
, and then call functionexecuteTask
.executeTask
would return until thecontext
is done orCreateIndexReqTask.Execute
returned.
type reqTask interface {
Ctx() context.Context
Type() commonpb.MsgType
Execute(ctx context.Context) error
Core() *Core
}
type CreateIndexReqTask struct {
baseReqTask
Req *milvuspb.CreateIndexRequest
}
-
According to the index type and index parameters,
RootCoord
lists all theSegments
that need to be indexed on thisCollection
.RootCoord
would only check thoseSegments
which have been flushed at this stage. We will describe how to deal with those newly add segments and growing segments later. -
For each
Segment
,RootCoord
would start aGrpc
request toDataCoord
to getBinlog
paths of thatSegment
, theproto
is defined as following
service DataCoord {
...
rpc GetInsertBinlogPaths(GetInsertBinlogPathsRequest) returns (GetInsertBinlogPathsResponse) {}
...
}
message GetInsertBinlogPathsRequest {
common.MsgBase base = 1;
int64 segmentID = 2;
}
message GetInsertBinlogPathsResponse {
repeated int64 fieldIDs = 1;
repeated internal.StringList paths = 2;
common.Status status = 3;
}
- After getting the
Segment
'sBinlog
paths,RootCoord
would send aGrpc
request toIndexCoord
, askIndexCoord
to build index on thisSegment
, theproto
is defined as follow:
service IndexCoord {
...
rpc BuildIndex(BuildIndexRequest) returns (BuildIndexResponse){}
...
}
message BuildIndexRequest {
int64 indexBuildID = 1;
string index_name = 2;
int64 indexID = 3;
repeated string data_paths = 5;
repeated common.KeyValuePair type_params = 6;
repeated common.KeyValuePair index_params = 7;
}
message BuildIndexResponse {
common.Status status = 1;
int64 indexBuildID = 2;
}
- The execution flow of
BuildIndex
onIndexCoord
is shown in the flowwing figure
-
IndexCoord
would wrap theBuildIndex
request intoIndexAddTask
, then alloc a global unique ID asIndexBuildID
, and write thisSegment
'sindex mate
intoIndexCoord
'smetaTable
. When finish these operation,IndexCoord
would send response toRootCoord
, the response includes theIndexBuildID
. -
When
RootCoood
receives theBuildIndexResponse
, it would extract theIndexBuildID
from the response, updateRootCoord
'smetaTable
, then send responses toProxy
. -
There is a backgroud service,
assignTaskLoop
, inIndexCoord
.assignTaskLoop
would callGetUnassignedTask
periodically, the default interval is 3s.GetUnassignedTask
would list these segments whosindex meta
has been updated, but index has not been created yet. -
The previous step has listed the segments whos index has not been created, for each those segments,
IndexCoord
would callPeekClient
to get an availableIndexNode
, and sendCreateIndex
request to thisIndexNode
. Theproto
is defined as follow.
service IndexNode {
...
rpc CreateIndex(CreateIndexRequest) returns (common.Status){}
...
}
message CreateIndexRequest {
int64 indexBuildID = 1;
string index_name = 2;
int64 indexID = 3;
int64 version = 4;
string meta_path = 5;
repeated string data_paths = 6;
repeated common.KeyValuePair type_params = 7;
repeated common.KeyValuePair index_params = 8;
}
-
When receive
CreateIndex
request,IndexNode
would wrap this request intoIndexBuildTask
, and push this task intoIndexBuildQueue
, then send response toIndexCoord
-
There is a background service,
indexBuildLoop
, in theIndexNode
.indexBuildLoop
would callscheduleIndexBuildTask
to get aIndexBuildTask
fromIndexBuildQueue
, and then start anothergoroutine
to build index and update meta.
Node: InexNode
will not notify the QueryCoord
to load the index file, if the users want to speed up seach by these index files, he should call ReleaseCollection
firstly, then call LoadCollection
to load these index files.
- As mentioned earlier,
RootCoord
would only search on these flushed segments onCreateIndex
request, the following figure show how to deal with the newly add segments.
- When a segment has been flushed,
DataCoord
would notifyRootCoord
viaSegmentFlushCompleted
, theproto
is defined as follow:
service RootCoord {
...
rpc SegmentFlushCompleted(data.SegmentFlushCompletedMsg) returns (common.Status) {}
...
}
message SegmentFlushCompletedMsg {
common.MsgBase base = 1;
SegmentInfo segment = 2;
}
message SegmentInfo {
int64 ID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
string insert_channel = 4;
int64 num_of_rows = 5;
common.SegmentState state = 6;
int64 max_row_num = 7;
uint64 last_expire_time = 8;
internal.MsgPosition start_position = 9;
internal.MsgPosition dml_position = 10;
repeated FieldBinlog binlogs = 11;
}
-
If users has called
CreateIndex
on thisCollection
, then whenRootCoord
receivesSegmentFlushCompleted
request, it would extract theSegmentID
from the request, and send aGetInsertBinlogPaths
request toDataCoord
to get theBinlog
paths, finallyRootCoord
would send aBuildIndex
request toIndexCoord
to notifyIndexCoord
to build index on this segment. -
The
Grpc
call ofSegmentFlushCompleted
might failed dure to network problem or some others, so how to create index if theGrpc
failed ? The follwing figure show the solution.
-
There is a backgroud service,
checkFlushedSegmentLoop
, inRootCoord
.checkFlushedSegmentLoop
would periodically check whether there is a segment that needs to be created index but has not been created, the default interval is10 minutes
, and callDataCoord
andIndexCoord
's service to create index on these segments. -
In
Milvus 2.0
,Create Index
is an asynchronous operation, so theSDK
need to sendGetIndexStates
request toIndexCoord
periodically to check if the index has been created, theproto
is defined as follow.
service IndexCoord {
...
rpc GetIndexStates(GetIndexStatesRequest) returns (GetIndexStatesResponse) {}
...
}
message GetIndexStatesRequest {
repeated int64 indexBuildIDs = 1;
}
message GetIndexStatesResponse {
common.Status status = 1;
repeated IndexInfo states = 2;
}
message IndexInfo {
common.IndexState state = 1;
int64 indexBuildID = 2;
int64 indexID = 3;
string index_name = 4;
string reason = 5;
}
enum IndexState {
IndexStateNone = 0;
Unissued = 1;
InProgress = 2;
Finished = 3;
Failed = 4;
}