Signed-off-by: min.tian <min.tian.cn@gmail.com>
5.2 KiB
Flush Collection
The Flush
operation is used to make sure that inserted data will be written into persistent storage. This document will introduce how the Flush
operation works in Milvus 2.0
. The following figure shows the execution flow of Flush
.
- Firstly,
SDK
starts aFlush
request toProxy
viaGrpc
, theproto
is defined as follows:
service MilvusService {
...
rpc Flush(FlushRequest) returns (FlushResponse) {}
...
}
message FlushRequest {
common.MsgBase base = 1;
string db_name = 2;
repeated string collection_names = 3;
}
message FlushResponse{
common.Status status = 1;
string db_name = 2;
map<string, schema.LongArray> coll_segIDs = 3;
}
- When
Proxy
receivesFlush
request, it would wrap this request intoFlushTask
, and push this task intoDdTaskQueue
queue. After that,Proxy
would call methodWatiToFinish
to wait until the task finished.
type task interface {
TraceCtx() context.Context
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
Name() string
Type() commonpb.MsgType
BeginTs() Timestamp
EndTs() Timestamp
SetTs(ts Timestamp)
OnEnqueue() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
}
type FlushTask struct {
Condition
*milvuspb.FlushRequest
ctx context.Context
dataCoord types.DataCoord
result *milvuspb.FlushResponse
}
-
There is a background service in
Proxy
. This service getsFlushTask
fromDdTaskQueue
, and executes in three phases:-
PreExecute
FlushTask
does nothing at this phase, and returns directly -
Execute
Proxy
sends aFlush
request toDataCoord
viaGrpc
, and waits for the response, theproto
is defined as follows:
service DataCoord { ... rpc Flush(FlushRequest) returns (FlushResponse) {} ... } message FlushRequest { common.MsgBase base = 1; int64 dbID = 2; int64 collectionID = 4; } message FlushResponse { common.Status status = 1; int64 dbID = 2; int64 collectionID = 3; repeated int64 segmentIDs = 4; }
-
PostExecute
FlushTask
does nothing at this phase, and returns directly
-
-
After receiving a
Flush
request fromProxy
,DataCoord
would callSealAllSegments
to seal all the growing segments belonging to thisCollection
, and will not allocate newID
s for these segments any more. After that,DataCoord
would send a response toProxy
, which contains all the sealed segmentID
s. -
In
Milvus 2.0
,Flush
is an asynchronous operation. So whenSDK
receives the response ofFlush
, it only means that theDataCoord
has sealed these segments. There are 2 problems that we have to solve.- The sealed segments might still in memory, and have not been written into persistent storage yet.
DataCoord
would no longer allocate newID
s for these sealed segments, but how to make sure all the allocatedID
s have been consumed byDataNode
.
-
For the first problem,
SDK
should sendGetSegmentInfo
request toDataCoord
periodically, until all sealed segments are in state ofFlushed
. Theproto
is defined as follows.
service DataCoord {
...
rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
...
}
message GetSegmentInfoRequest {
common.MsgBase base = 1;
repeated int64 segmentIDs = 2;
}
message GetSegmentInfoResponse {
common.Status status = 1;
repeated SegmentInfo infos = 2;
}
message SegmentInfo {
int64 ID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
string insert_channel = 4;
int64 num_of_rows = 5;
common.SegmentState state = 6;
internal.MsgPosition dml_position = 7;
int64 max_row_num = 8;
uint64 last_expire_time = 9;
internal.MsgPosition start_position = 10;
}
enum SegmentState {
SegmentStateNone = 0;
NotExist = 1;
Growing = 2;
Sealed = 3;
Flushed = 4;
Flushing = 5;
}
- For the second problem,
DataNode
would report a timestamp toDataCoord
every time it consumes a package fromMsgStream
, theproto
is defined as follows.
message DataNodeTtMsg {
common.MsgBase base = 1;
string channel_name = 2;
uint64 timestamp = 3;
}
- There is a backgroud service,
startDataNodeTsLoop
, inDataCoord
to process the message ofDataNodeTtMsg
.- Firstly,
DataCoord
would extractchannel_name
fromDataNodeTtMsg
, and filter out all sealed segments that are attached on thischannel_name
- Compare the timestamp when the segment enters into state of
Sealed
with theDataNodeTtMsg.timestamp
, ifDataNodeTtMsg.timestamp
is greater, which means that allID
s belonging to that segment have been consumed byDataNode
, it's safe to notifyDataNode
to write that segment into persistent storage. Theproto
is defined as follows:
- Firstly,
service DataNode {
...
rpc FlushSegments(FlushSegmentsRequest) returns(common.Status) {}
...
}
message FlushSegmentsRequest {
common.MsgBase base = 1;
int64 dbID = 2;
int64 collectionID = 3;
repeated int64 segmentIDs = 4;
}