Clear golint warning in msgstream (#8722)

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
This commit is contained in:
Xiangyu Wang 2021-09-27 23:52:02 +08:00 committed by GitHub
parent 6065bb50c4
commit 99b022fc4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 7 deletions

View File

@ -19,10 +19,19 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// UniqueID is an alias for short
type UniqueID = typeutil.UniqueID
// Timestamp is an alias for short
type Timestamp = typeutil.Timestamp
// IntPrimaryKey is an alias for short
type IntPrimaryKey = typeutil.IntPrimaryKey
// MsgPosition is an alias for short
type MsgPosition = internalpb.MsgPosition
// MessageID is an alias for short
type MessageID = mqclient.MessageID
// MsgPack represents a batch of msg in msgstream
@ -34,6 +43,7 @@ type MsgPack struct {
EndPositions []*MsgPosition
}
// RepackFunc is a function type which used to repack message after hash by primary key
type RepackFunc func(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
// MsgStream is an interface that can be used to produce and consume message on message queue

View File

@ -19,6 +19,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
// InsertRepackFunc is used to repack messages after hash by primary key
func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
@ -74,6 +75,7 @@ func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
return result, nil
}
// DeleteRepackFunc is used to repack messages after hash by primary key
func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
@ -118,6 +120,7 @@ func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
return result, nil
}
// DefaultRepackFunc is used to repack messages after hash by primary key
func DefaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {

View File

@ -17,23 +17,25 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
)
// UnmarshalFunc is an interface that has been implemented by each Msg
type UnmarshalFunc func(interface{}) (TsMsg, error)
// UnmarshalDispatcher is an interface contains method Unmarshal
type UnmarshalDispatcher interface {
Unmarshal(input interface{}, msgType commonpb.MsgType) (TsMsg, error)
AddMsgTemplate(msgType commonpb.MsgType, unmarshalFunc UnmarshalFunc)
}
// UnmarshalDispatcherFactory is a factory to generate an object which implement interface UnmarshalDispatcher
type UnmarshalDispatcherFactory interface {
NewUnmarshalDispatcher() *UnmarshalDispatcher
}
// ProtoUnmarshalDispatcher ant its factory
// ProtoUnmarshalDispatcher is Unmarshal Dispatcher which used for data of proto type
type ProtoUnmarshalDispatcher struct {
TempMap map[commonpb.MsgType]UnmarshalFunc
}
// Unmarshal will forward unmarshal request to msg type specified unmarshal function
func (p *ProtoUnmarshalDispatcher) Unmarshal(input interface{}, msgType commonpb.MsgType) (TsMsg, error) {
unmarshalFunc, ok := p.TempMap[msgType]
if !ok {
@ -42,12 +44,10 @@ func (p *ProtoUnmarshalDispatcher) Unmarshal(input interface{}, msgType commonpb
return unmarshalFunc(input)
}
func (p *ProtoUnmarshalDispatcher) AddMsgTemplate(msgType commonpb.MsgType, unmarshalFunc UnmarshalFunc) {
p.TempMap[msgType] = unmarshalFunc
}
// ProtoUDFactory is a factory to generate ProtoUnmarshalDispatcher object
type ProtoUDFactory struct{}
// NewUnmarshalDispatcher returns an new UnmarshalDispatcher
func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
insertMsg := InsertMsg{}
deleteMsg := DeleteMsg{}