# Flush Collection The `Flush` operation is used to make sure that inserted data will be written into persistent storage. This document will introduce how the `Flush` operation works in `Milvus 2.0`. The following figure shows the execution flow of `Flush`. ![flush_collections](./graphs/flush_data_coord.png) 1. Firstly, `SDK` sends a `Flush` request to `Proxy` via `Grpc`, the `proto` is defined as follows: ```proto service MilvusService { ... rpc Flush(FlushRequest) returns (FlushResponse) {} ... } message FlushRequest { common.MsgBase base = 1; string db_name = 2; repeated string collection_names = 3; } message FlushResponse{ common.Status status = 1; string db_name = 2; map coll_segIDs = 3; } ``` 2. When `Proxy` receives `Flush` request, it would wrap this request into `FlushTask`, and push this task into `DdTaskQueue` queue. After that, `Proxy` would call `WatiToFinish` to wait until the task finished. ```go type task interface { TraceCtx() context.Context ID() UniqueID // return ReqID SetID(uid UniqueID) // set ReqID Name() string Type() commonpb.MsgType BeginTs() Timestamp EndTs() Timestamp SetTs(ts Timestamp) OnEnqueue() error PreExecute(ctx context.Context) error Execute(ctx context.Context) error PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) } type FlushTask struct { Condition *milvuspb.FlushRequest ctx context.Context dataCoord types.DataCoord result *milvuspb.FlushResponse } ``` 3. There is a background service in `Proxy`. This service gets `FlushTask` from `DdTaskQueue`, and executes in three phases: - `PreExecute` `FlushTask` does nothing at this phase, and returns directly - `Execute` `Proxy` sends a `Flush` request to `DataCoord` via `Grpc`, and waits for the response, the `proto` is defined as follows: ```proto service DataCoord { ... rpc Flush(FlushRequest) returns (FlushResponse) {} ... } message FlushRequest { common.MsgBase base = 1; int64 dbID = 2; int64 collectionID = 4; } message FlushResponse { common.Status status = 1; int64 dbID = 2; int64 collectionID = 3; repeated int64 segmentIDs = 4; } ``` - `PostExecute` `FlushTask` does nothing at this phase, and returns directly 4. After receiving a `Flush` request from `Proxy`, `DataCoord` would call `SealAllSegments` to seal all the growing segments belonging to this `Collection`, and would not allocate new `ID`s for these segments anymore. After that, `DataCoord` would send a response to `Proxy`, which contains all the sealed segment `ID`s. 5. In `Milvus 2.0`, `Flush` is an asynchronous operation. So when `SDK` receives the response of `Flush`, it only means that the `DataCoord` has sealed these segments. There are 2 problems that we have to solve. - The sealed segments might still in memory, and have not been written into persistent storage yet. - `DataCoord` would no longer allocate new `ID`s for these sealed segments, but how to make sure all the allocated `ID`s have been consumed by `DataNode`. 6. For the first problem, `SDK` should send `GetSegmentInfo` request to `DataCoord` periodically, until all sealed segments are in state of `Flushed`. The `proto` is defined as follows. ```proto service DataCoord { ... rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {} ... } message GetSegmentInfoRequest { common.MsgBase base = 1; repeated int64 segmentIDs = 2; } message GetSegmentInfoResponse { common.Status status = 1; repeated SegmentInfo infos = 2; } message SegmentInfo { int64 ID = 1; int64 collectionID = 2; int64 partitionID = 3; string insert_channel = 4; int64 num_of_rows = 5; common.SegmentState state = 6; internal.MsgPosition dml_position = 7; int64 max_row_num = 8; uint64 last_expire_time = 9; internal.MsgPosition start_position = 10; } enum SegmentState { SegmentStateNone = 0; NotExist = 1; Growing = 2; Sealed = 3; Flushed = 4; Flushing = 5; } ``` 7. For the second problem, `DataNode` would report a timestamp to `DataCoord` every time it consumes a package from `MsgStream`, the `proto` is defined as follows. ```proto message DataNodeTtMsg { common.MsgBase base = 1; string channel_name = 2; uint64 timestamp = 3; } ``` 8. There is a background service, `startDataNodeTsLoop`, in `DataCoord` to process the message of `DataNodeTtMsg`. - Firstly, `DataCoord` would extract `channel_name` from `DataNodeTtMsg`, and filter out all sealed segments that are attached on this `channel_name` - Compare the timestamp when the segment enters into state of `Sealed` with the `DataNodeTtMsg.timestamp`, if `DataNodeTtMsg.timestamp` is greater, which means that all `ID`s belonging to that segment have been consumed by `DataNode`, it's safe to notify `DataNode` to write that segment into persistent storage. The `proto` is defined as follows: ```proto service DataNode { ... rpc FlushSegments(FlushSegmentsRequest) returns(common.Status) {} ... } message FlushSegmentsRequest { common.MsgBase base = 1; int64 dbID = 2; int64 collectionID = 3; repeated int64 segmentIDs = 4; } ```