Update interface in msgstream

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
This commit is contained in:
Xiangyu Wang 2021-02-02 16:32:15 +08:00 committed by yefu.chen
parent e4e3ec88d1
commit 32c5a92449
3 changed files with 127 additions and 67 deletions

View File

@ -74,6 +74,8 @@ type DescribeChannelResponse struct {
* Interface
``` go
// Msg
type MsgType uint32
const {
kInsert MsgType = 400
@ -88,12 +90,12 @@ const {
}
type TsMsg interface {
SetTs(ts Timestamp)
BeginTs() Timestamp
EndTs() Timestamp
Type() MsgType
Marshal(*TsMsg) []byte
Unmarshal([]byte) *TsMsg
SetTs(ts Timestamp)
BeginTs() Timestamp
EndTs() Timestamp
Type() MsgType
Marshal(*TsMsg) interface{}
Unmarshal(interface{}) *TsMsg
}
type MsgPosition {
@ -110,15 +112,71 @@ type MsgPack struct {
EndPositions []MsgPosition
}
type MsgStream interface {
Produce(*MsgPack) error
Broadcast(*MsgPack) error
Consume() *MsgPack // message can be consumed exactly once
ShowChannelNames() []string
Seek(offset MsgPosition) error
type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack
```
```go
// Unmarshal
// Interface
type UnmarshalFunc func(interface{}) *TsMsg
type UnmarshalDispatcher interface {
Unmarshal(interface{}, msgType commonpb.MsgType) (msgstream.TsMsg, error)
}
type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack
type UnmarshalDispatcherFactory interface {
NewUnmarshalDispatcher() *UnmarshalDispatcher
}
// Proto & Mem Implementation
type ProtoUDFactory struct {}
func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher
type MemUDFactory struct {}
func (mudf *MemUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher
```
```go
// MsgStream
// Interface
type MsgStream interface {
Start()
Close()
AsProducer(channels []string)
AsConsumer(channels []string, subName string)
Produce(*MsgPack) error
Broadcast(*MsgPack) error
Consume() *MsgPack // message can be consumed exactly once
Seek(mp *MsgPosition) error
}
type MsgStreamFactory interface {
NewMsgStream() *MsgStream
NewTtMsgStream() *MsgStream
}
// Pulsar
type PulsarMsgStreamFactory interface {}
func (pmsf *PulsarMsgStreamFactory) NewMsgStream() *MsgStream
func (pmsf *PulsarMsgStreamFactory) NewTtMsgStream() *MsgStream
// RockMQ
type RmqMsgStreamFactory interface {}
func (rmsf *RmqMsgStreamFactory) NewMsgStream() *MsgStream
func (rmsf *RmqMsgStreamFactory) NewTtMsgStream() *MsgStream
```
```go
// PulsarMsgStream
type PulsarMsgStream struct {
client *pulsar.Client
@ -128,16 +186,17 @@ type PulsarMsgStream struct {
unmarshal *UnmarshalDispatcher
}
func (ms *PulsarMsgStream) CreatePulsarProducers(topics []string)
func (ms *PulsarMsgStream) CreatePulsarConsumers(subname string, topics []string, unmarshal *UnmarshalDispatcher)
func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc)
func (ms *PulsarMsgStream) Start() error
func (ms *PulsarMsgStream) Close() error
func (ms *PulsarMsgStream) AsProducer(channels []string)
func (ms *PulsarMsgStream) AsConsumer(channels []string, subName string)
func (ms *PulsarMsgStream) Produce(msgs *MsgPack) error
func (ms *PulsarMsgStream) Broadcast(msgs *MsgPack) error
func (ms *PulsarMsgStream) Consume() (*MsgPack, error)
func (ms *PulsarMsgStream) Start() error
func (ms *PulsarMsgStream) Close() error
func (ms *PulsarMsgStream) Seek(mp *MsgPosition) error
func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc)
func NewPulsarMsgStream(ctx context.Context, pulsarAddr string) *PulsarMsgStream
func NewPulsarMsgStream(ctx context.Context, pulsarAddr string, bufferSize int64) *PulsarMsgStream
type PulsarTtMsgStream struct {
@ -151,38 +210,67 @@ type PulsarTtMsgStream struct {
msgPacks []*MsgPack
}
func (ms *PulsarTtMsgStream) CreatePulsarProducers(topics []string)
func (ms *PulsarTtMsgStream) CreatePulsarConsumers(subname string, topics []string, unmarshal *UnmarshalDispatcher)
func (ms *PulsarTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
func (ms *PulsarTtMsgStream) Start() error
func (ms *PulsarTtMsgStream) Close() error
func (ms *PulsarTtMsgStream) AsProducer(channels []string)
func (ms *PulsarTtMsgStream) AsConsumer(channels []string, subName string)
func (ms *PulsarTtMsgStream) Produce(msgs *MsgPack) error
func (ms *PulsarTtMsgStream) Broadcast(msgs *MsgPack) error
func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick
func (ms *PulsarTtMsgStream) Start() error
func (ms *PulsarTtMsgStream) Close() error
func (ms *PulsarTtMsgStream) Seek(mp *MsgPosition) error
func (ms *PulsarTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
func NewPulsarTtMsgStream(ctx context.Context, pulsarAddr string) *PulsarTtMsgStream
```
func NewPulsarTtMsgStream(ctx context.Context, pulsarAddr string, bufferSize int64) *PulsarTtMsgStream
// RmqMsgStream
```go
type MarshalFunc func(*TsMsg) []byte
type UnmarshalFunc func([]byte) *TsMsg
type UnmarshalDispatcher struct {
tempMap map[ReqType]UnmarshalFunc
type RmqMsgStream struct {
client *rockermq.RocksMQ
repackFunc RepackFunc
producers []string
consumers []string
subName string
unmarshal *UnmarshalDispatcher
}
func (dispatcher *MarshalDispatcher) Unmarshal([]byte) *TsMsg
func (dispatcher *MarshalDispatcher) AddMsgTemplate(msgType MsgType, marshal MarshalFunc)
func (dispatcher *MarshalDispatcher) addDefaultMsgTemplates()
func (ms *RmqMsgStream) Start() error
func (ms *RmqMsgStream) Close() error
func (ms *RmqMsgStream) AsProducer(channels []string)
func (ms *RmqMsgStream) AsConsumer(channels []string, subName string)
func (ms *RmqMsgStream) Produce(msgs *MsgPack) error
func (ms *RmqMsgStream) Broadcast(msgs *MsgPack) error
func (ms *RmqMsgStream) Consume() (*MsgPack, error)
func (ms *RmqMsgStream) Seek(mp *MsgPosition) error
func (ms *RmqMsgStream) SetRepackFunc(repackFunc RepackFunc)
func NewUnmarshalDispatcher() *UnmarshalDispatcher
func NewRmqMsgStream(ctx context.Context) *RmqMsgStream
type RmqTtMsgStream struct {
client *rockermq.RocksMQ
repackFunc RepackFunc
producers []string
consumers []string
subName string
unmarshal *UnmarshalDispatcher
}
func (ms *RmqTtMsgStream) Start() error
func (ms *RmqTtMsgStream) Close() error
func (ms *RmqTtMsgStream) AsProducer(channels []string)
func (ms *RmqTtMsgStream) AsConsumer(channels []string, subName string)
func (ms *RmqTtMsgStream) Produce(msgs *MsgPack) error
func (ms *RmqTtMsgStream) Broadcast(msgs *MsgPack) error
func (ms *RmqTtMsgStream) Consume() (*MsgPack, error)
func (ms *RmqTtMsgStream) Seek(mp *MsgPosition) error
func (ms *RmqTtMsgStream) SetRepackFunc(repackFunc RepackFunc)
func NewRmqTtMsgStream(ctx context.Context) *RmqTtMsgStream
```
#### A.4 RocksMQ
RocksMQ is a RocksDB-based messaging/streaming library.

View File

@ -75,7 +75,6 @@ type InsertChannelsMap struct {
insertChannels [][]string // it's a little confusing to use []string as the key of map
insertMsgStreams []msgstream.MsgStream // maybe there's a better way to implement Set, just agilely now
droppedBitMap []int // 0 -> normal, 1 -> dropped
usageHistogram []int // message stream can be closed only when the use count is zero
mtx sync.RWMutex
nodeInstance *NodeImpl
}
@ -94,7 +93,6 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st
for loc, existedChannels := range m.insertChannels {
if m.droppedBitMap[loc] == 0 && SortedSliceEqual(existedChannels, channels) {
m.collectionID2InsertChannels[collID] = loc
m.usageHistogram[loc]++
return nil
}
}
@ -110,7 +108,6 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st
stream.Start()
m.insertMsgStreams = append(m.insertMsgStreams, stream)
m.droppedBitMap = append(m.droppedBitMap, 0)
m.usageHistogram = append(m.usageHistogram, 1)
return nil
}
@ -126,14 +123,7 @@ func (m *InsertChannelsMap) closeInsertMsgStream(collID UniqueID) error {
if m.droppedBitMap[loc] != 0 {
return errors.New("insert message stream already closed")
}
if m.usageHistogram[loc] <= 0 {
return errors.New("insert message stream already closed")
}
m.usageHistogram[loc]--
if m.usageHistogram[loc] <= 0 {
m.insertMsgStreams[loc].Close()
}
m.insertMsgStreams[loc].Close()
log.Print("close insert message stream ...")
m.droppedBitMap[loc] = 1
@ -174,28 +164,11 @@ func (m *InsertChannelsMap) getInsertMsgStream(collID UniqueID) (msgstream.MsgSt
return m.insertMsgStreams[loc], nil
}
func (m *InsertChannelsMap) closeAllMsgStream() {
m.mtx.Lock()
defer m.mtx.Unlock()
for _, stream := range m.insertMsgStreams {
stream.Close()
}
m.collectionID2InsertChannels = make(map[UniqueID]int)
m.insertChannels = make([][]string, 0)
m.insertMsgStreams = make([]msgstream.MsgStream, 0)
m.droppedBitMap = make([]int, 0)
m.usageHistogram = make([]int, 0)
}
func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap {
return &InsertChannelsMap{
collectionID2InsertChannels: make(map[UniqueID]int),
insertChannels: make([][]string, 0),
insertMsgStreams: make([]msgstream.MsgStream, 0),
droppedBitMap: make([]int, 0),
usageHistogram: make([]int, 0),
nodeInstance: node,
}
}

View File

@ -285,7 +285,6 @@ func (node *NodeImpl) Start() error {
func (node *NodeImpl) Stop() error {
node.cancel()
globalInsertChannelsMap.closeAllMsgStream()
node.tsoAllocator.Close()
node.idAllocator.Close()
node.segAssigner.Close()