diff --git a/docs/developer_guides/developer_guides.md b/docs/developer_guides/developer_guides.md index 1d9e59ee9d..e766470745 100644 --- a/docs/developer_guides/developer_guides.md +++ b/docs/developer_guides/developer_guides.md @@ -354,6 +354,7 @@ type TsMsgMarshaler interface { type MsgStream interface { SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) Produce(*MsgPack) error + Broadcast(*MsgPack) error Consume() *MsgPack // message can be consumed exactly once } @@ -361,23 +362,23 @@ type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack type PulsarMsgStream struct { client *pulsar.Client - repackFunc RepackFunc - producers []*pulsar.Producer - consumers []*pulsar.Consumer - msgMarshaler *TsMsgMarshaler - msgUnmarshaler *TsMsgMarshaler + repackFunc RepackFunc + inputs []*pulsar.Producer + msgUnmarshaler []*TsMsgMarshaler + outputs []*pulsar.Consumer + msgMarshaler []*TsMsgMarshaler } -func (ms *PulsarMsgStream) SetProducerChannels(channels []string) -func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) -func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) +func (ms *PulsarMsgStream) AddInputTopics(topics []string, unmarshal *TsMsgMarshaler) +func (ms *PulsarMsgStream) AddOutputTopics(topics []string, marshal *TsMsgMarshaler) func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) func (ms *PulsarMsgStream) Produce(msgs *MsgPack) error -func (ms *PulsarMsgStream) Consume() (*MsgPack, error) //return messages in one time tick +func (ms *PulsarMsgStream) Broadcast(msgs *MsgPack) error +func (ms *PulsarMsgStream) Consume() (*MsgPack, error) type PulsarTtMsgStream struct { client *pulsar.Client - msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack + repackFunc RepackFunc producers []*pulsar.Producer consumers []*pulsar.Consumer msgMarshaler *TsMsgMarshaler @@ -387,12 +388,13 @@ type PulsarTtMsgStream struct { msgPacks []*MsgPack } -func (ms *PulsarMsgStream) SetProducerChannels(channels []string) -func (ms *PulsarMsgStream) SetConsumerChannels(channels []string) -func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) -func (ms *PulsarMsgStream) SetMsgHashFunc(hashFunc *HashFunc) -func (ms *PulsarMsgStream) Produce(msgs *MsgPack) error -func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick +func (ms *PulsarTtMsgStream) SetProducers(channels []string) +func (ms *PulsarTtMsgStream) SetConsumers(channels []string) +func (ms *PulsarTtMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) +func (ms *PulsarTtMsgStream) SetRepackFunc(repackFunc RepackFunc) +func (ms *PulsarTtMsgStream) Produce(msgs *MsgPack) error +func (ms *PulsarTtMsgStream) Broadcast(msgs *MsgPack) error +func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick ``` @@ -513,6 +515,7 @@ type KVBase interface { MultiSave(kvs map[string]string) Remove(key string) error MultiRemove(keys []string) + MultiSaveAndRemove(saves map[string]string, removals []string) Watch(key string) clientv3.WatchChan WatchWithPrefix(key string) clientv3.WatchChan LoadWithPrefix(key string) ( []string, []string, error) @@ -1077,11 +1080,11 @@ Note that *tenantId*, *proxyId*, *collectionId*, *segmentId* are unique strings ```go type metaTable struct { kv kv.Base // client of a reliable kv service, i.e. etcd client - tenantId2Meta map[int64]TenantMeta // tenant id to tenant meta - proxyId2Meta map[int64]ProxyMeta // proxy id to proxy meta - collId2Meta map[int64]CollectionMeta // collection id to collection meta - collName2Id map[string]int64 // collection name to collection id - segId2Meta map[int64]SegmentMeta // segment id to segment meta + tenantId2Meta map[UniqueId]TenantMeta // tenant id to tenant meta + proxyId2Meta map[UniqueId]ProxyMeta // proxy id to proxy meta + collId2Meta map[UniqueId]CollectionMeta // collection id to collection meta + collName2Id map[string]UniqueId // collection name to collection id + segId2Meta map[UniqueId]SegmentMeta // segment id to segment meta tenantLock sync.RWMutex proxyLock sync.RWMutex @@ -1089,24 +1092,24 @@ type metaTable struct { } func (meta *metaTable) AddTenant(tenant *TenantMeta) error -func (meta *metaTable) DeleteTenant(tenantId int64) error +func (meta *metaTable) DeleteTenant(tenantId UniqueId) error func (meta *metaTable) AddProxy(proxy *ProxyMeta) error -func (meta *metaTable) DeleteProxy(proxyId int64) error +func (meta *metaTable) DeleteProxy(proxyId UniqueId) error func (meta *metaTable) AddCollection(coll *CollectionMeta) error -func (meta *metaTable) DeleteCollection(collId int64) error -func (meta *metaTable) HasCollection(collId int64) bool +func (meta *metaTable) DeleteCollection(collId UniqueId) error +func (meta *metaTable) HasCollection(collId UniqueId) bool func (meta *metaTable) GetCollectionByName(collName string) (*CollectionMeta, error) -func (meta *metaTable) AddPartition(collId int64, tag string) error -func (meta *metaTable) HasPartition(collId int64, tag string) bool -func (meta *metaTable) DeletePartition(collId int64, tag string) error +func (meta *metaTable) AddPartition(collId UniqueId, tag string) error +func (meta *metaTable) HasPartition(collId UniqueId, tag string) bool +func (meta *metaTable) DeletePartition(collId UniqueId, tag string) error func (meta *metaTable) AddSegment(seg *SegmentMeta) error -func (meta *metaTable) GetSegmentById(segId int64)(*SegmentMeta, error) -func (meta *metaTable) DeleteSegment(segId int64) error -func (meta *metaTable) CloseSegment(segId int64, closeTs Timestamp, num_rows int64) error +func (meta *metaTable) GetSegmentById(segId UniqueId)(*SegmentMeta, error) +func (meta *metaTable) DeleteSegment(segId UniqueId) error +func (meta *metaTable) CloseSegment(segId UniqueId, closeTs Timestamp, num_rows int64) error func NewMetaTable(kv kv.Base) (*metaTable,error) ``` @@ -1115,3 +1118,84 @@ func NewMetaTable(kv kv.Base) (*metaTable,error) * *AddSegment(seg \*SegmentMeta)* first update *CollectionMeta* by adding the segment id, then it adds a new SegmentMeta to *kv*. All the modifications are done transactionally. + + +#### 10.5 System Time Synchronization + + + +###### 10.5.1 Time Tick Barrier + +* Soft Time Tick Barrier + + + + + +```go +type softTimeTickBarrier struct { + peer2LastTt map[UniqueId]Timestamp + minTtInterval Timestamp + lastTt Timestamp + outTt chan Timestamp + ttStream *MsgStream +} + +func (ttBarrier *softTimeTickBarrier) GetTimeTick() Timestamp +func (ttBarrier *softTimeTickBarrier) Start() error + +func newSoftTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId, minTtInterval Timestamp) *softTimeTickBarrier +``` + + + +* Hard Time Tick Barrier + + + +```go +type hardTimeTickBarrier struct { + peer2Tt map[UniqueId]List + outTt chan Timestamp + ttStream *MsgStream +} + +func (ttBarrier *hardTimeTickBarrier) GetTimeTick() Timestamp +func (ttBarrier *hardTimeTickBarrier) Start() error + +func newHardTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId) *softTimeTickBarrier +``` + + + +###### 10.5.1 Time Synchornization Message Producer + + + + + + ```go +type timeSyncMsgProducer struct { + proxyTtBarrier *softTimeTickBarrier + WriteNodeTtBarrier *hardTimeTickBarrier + + insertSyncStream *MsgStream + deleteSyncStream *MsgStream + k2sSyncStream *MsgStream +} + +func (syncMsgProducer* timeSyncMsgProducer) SetProxyTtStreams(proxyTt *MsgStream, proxyIds []UniqueId) +func (syncMsgProducer* timeSyncMsgProducer) SetWriteNodeTtStreams(WriteNodeTt *MsgStream, writeNodeIds []UniqueId) + +func (syncMsgProducer* timeSyncMsgProducer) SetInsertSyncStream(insertSync *MsgStream) +func (syncMsgProducer* timeSyncMsgProducer) SetDeleteSyncStream(deleteSync *MsgStream) +func (syncMsgProducer* timeSyncMsgProducer) SetK2sSyncStream(k2sSync *MsgStream) + +func (syncMsgProducer* timeSyncMsgProducer) Start(ctx context.Context) error +func (syncMsgProducer* timeSyncMsgProducer) Close() error + ``` + + + + + diff --git a/docs/developer_guides/raw_figs/hard_tt_barrier.jpeg b/docs/developer_guides/raw_figs/hard_tt_barrier.jpeg new file mode 100644 index 0000000000..84bf81660a Binary files /dev/null and b/docs/developer_guides/raw_figs/hard_tt_barrier.jpeg differ diff --git a/docs/developer_guides/raw_figs/soft_tt_barrier.jpeg b/docs/developer_guides/raw_figs/soft_tt_barrier.jpeg new file mode 100644 index 0000000000..9233bf6c18 Binary files /dev/null and b/docs/developer_guides/raw_figs/soft_tt_barrier.jpeg differ diff --git a/docs/developer_guides/raw_figs/tt_msg_producer.jpeg b/docs/developer_guides/raw_figs/tt_msg_producer.jpeg new file mode 100644 index 0000000000..f4fa6e43f2 Binary files /dev/null and b/docs/developer_guides/raw_figs/tt_msg_producer.jpeg differ diff --git a/internal/proto/internal_msg.proto b/internal/proto/internal_msg.proto index 8c161a4259..77fbd14a33 100644 --- a/internal/proto/internal_msg.proto +++ b/internal/proto/internal_msg.proto @@ -31,6 +31,7 @@ enum ReqType { /* System Control */ kTimeTick = 1200; + kTimeSync = 1201; } enum PeerRole { @@ -193,7 +194,7 @@ message SearchResult { } -message TimeSyncMsg { +message TimeTickMsg { int64 peer_id = 1; uint64 timestamp = 2; }