Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
5.2 KiB
Flush Collection
Flush
operation is used to make sure that inserted data will be written into persistent storage. This document will introduce how Flush
operation works in Milvus 2.0
. The following figure shows the execution flow of Flush
.
- Firstly,
SDK
starts aFlush
request toProxy
viaGrpc
, theproto
is defined as follows:
service MilvusService {
...
rpc Flush(FlushRequest) returns (FlushResponse) {}
...
}
message FlushRequest {
common.MsgBase base = 1;
string db_name = 2;
repeated string collection_names = 3;
}
message FlushResponse{
common.Status status = 1;
string db_name = 2;
map<string, schema.LongArray> coll_segIDs = 3;
}
- When
Proxy
receivesFlush
request, it would wrap this request intoFlushTask
, and push this task intoDdTaskQueue
queue. After that,Proxy
would call methodWatiToFinish
to wait until the task finished.
type task interface {
TraceCtx() context.Context
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
Name() string
Type() commonpb.MsgType
BeginTs() Timestamp
EndTs() Timestamp
SetTs(ts Timestamp)
OnEnqueue() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
}
type FlushTask struct {
Condition
*milvuspb.FlushRequest
ctx context.Context
dataCoord types.DataCoord
result *milvuspb.FlushResponse
}
-
There is a backgroud service in
Proxy
. This service getsFlushTask
fromDdTaskQueue
, and executes in three phases:-
PreExecute
FlushTask
does nothing at this phase, and returns directly -
Execute
Proxy
sendsFlush
request toDataCoord
viaGrpc
, and waits for the response, theproto
is defined as follow:
service DataCoord { ... rpc Flush(FlushRequest) returns (FlushResponse) {} ... } message FlushRequest { common.MsgBase base = 1; int64 dbID = 2; int64 collectionID = 4; } message FlushResponse { common.Status status = 1; int64 dbID = 2; int64 collectionID = 3; repeated int64 segmentIDs = 4; }
-
PostExecute
FlushTask
does nothing at this phase, and returns directly
-
-
After receiving
Flush
request fromProxy
,DataCoord
would callSealAllSegments
to seal all the growing segments belonging to thisCollection
, and do not allocate newID
s for these segments any more. After that,DataCoord
would send response toProxy
, which contain all the sealed segmentID
s. -
In
Milvus 2.0
,Flush
is an asynchronous operation. So whenSDK
receives the response ofFlush
, it only means that theDataCoord
has sealed these segments. There are 2 problems that we have to solve.- The sealed segments might still in memory, and have not been written into persistent storage yet.
DataCoord
would no longer allocate newID
s for these sealed segments, but how to make sure all the allocatedID
s have been consumed byDataNode
.
-
For the first problem,
SDK
should sendGetSegmentInfo
request toDataCoord
periodically, until all sealed segments are in state ofFlushed
. Theproto
is defined as following.
service DataCoord {
...
rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
...
}
message GetSegmentInfoRequest {
common.MsgBase base = 1;
repeated int64 segmentIDs = 2;
}
message GetSegmentInfoResponse {
common.Status status = 1;
repeated SegmentInfo infos = 2;
}
message SegmentInfo {
int64 ID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
string insert_channel = 4;
int64 num_of_rows = 5;
common.SegmentState state = 6;
internal.MsgPosition dml_position = 7;
int64 max_row_num = 8;
uint64 last_expire_time = 9;
internal.MsgPosition start_position = 10;
}
enum SegmentState {
SegmentStateNone = 0;
NotExist = 1;
Growing = 2;
Sealed = 3;
Flushed = 4;
Flushing = 5;
}
- For the second problem,
DataNode
would report a timestamp toDataCoord
every time it consumes a package fromMsgStream
, theproto
is define as follow.
message DataNodeTtMsg {
common.MsgBase base = 1;
string channel_name = 2;
uint64 timestamp = 3;
}
- There is a backgroud service,
startDataNodeTsLoop
, inDataCoord
to process the message ofDataNodeTtMsg
.- Firstly,
DataCoord
would extractchannel_name
fromDataNodeTtMsg
, and filter out all sealed segments that attached on thischannel_name
- Compare the timestamp when the segment enters into state of
Sealed
with theDataNodeTtMsg.timestamp
, ifDataNodeTtMsg.timestamp
is greater, which means that allID
s belonging to that segment have been consumed byDataNode
, it's safe to notifyDataNode
to write that segment into persistent storage. Theproto
is defined as follow:
- Firstly,
service DataNode {
...
rpc FlushSegments(FlushSegmentsRequest) returns(common.Status) {}
...
}
message FlushSegmentsRequest {
common.MsgBase base = 1;
int64 dbID = 2;
int64 collectionID = 3;
repeated int64 segmentIDs = 4;
}