Upgrade pulsar-client-go to 0.6.0 (#7909)

MessageID interface of pulsar-client-go adds
4 more methods. So I add these methods into
`MessageID` interface.

Resolves: #7770

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2021-09-15 18:13:49 +08:00 committed by GitHub
parent cda5a5ee35
commit ff8fef6ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 73 additions and 1 deletions

2
go.mod
View File

@ -50,7 +50,7 @@ require (
)
replace (
github.com/apache/pulsar-client-go => github.com/apache/pulsar-client-go v0.5.0
github.com/apache/pulsar-client-go => github.com/apache/pulsar-client-go v0.6.0
google.golang.org/grpc => google.golang.org/grpc v1.38.0
github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
)

7
go.sum
View File

@ -40,6 +40,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4=
github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
github.com/AthenZ/athenz v1.10.15 h1:8Bc2W313k/ev/SGokuthNbzpwfg9W3frg3PKq1r943I=
github.com/AthenZ/athenz v1.10.15/go.mod h1:7KMpEuJ9E4+vMCMI3UQJxwWs0RZtQq7YXZ1IteUjdsc=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
@ -59,6 +61,8 @@ github.com/antonmedv/expr v1.8.9 h1:O9stiHmHHww9b4ozhPx7T6BK7fXfOCHJ8ybxf0833zw=
github.com/antonmedv/expr v1.8.9/go.mod h1:5qsM3oLGDND7sDmQGDXHkYfkjYMUX14qsgqmHhwGEk8=
github.com/apache/pulsar-client-go v0.5.0 h1:cM2e6dXBa9OyPtvGHxZB1OlSOWQxsWzu45btBvtmpYo=
github.com/apache/pulsar-client-go v0.5.0/go.mod h1:yj6hIv/EZXf5GgJJ8I3T13Yx9yspj8aF2QrJ5kzuueM=
github.com/apache/pulsar-client-go v0.6.0 h1:yKX7NsmJxR5mL6uIUxTTatNhMFlhurTASSZRJ9IULDg=
github.com/apache/pulsar-client-go v0.6.0/go.mod h1:A1P5VjjljsFKAD13w7/jmU3Dly2gcRvcobiULqQXhz4=
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd h1:P5kM7jcXJ7TaftX0/EMKiSJgvQc/ct+Fw0KMvcH3WuY=
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
github.com/apache/thrift v0.14.2 h1:hY4rAyg7Eqbb27GB6gkhUKrRAuc8xRjlNtJq+LseKeY=
@ -71,6 +75,7 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go v1.30.8/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
@ -249,6 +254,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
@ -647,6 +653,7 @@ golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=

View File

@ -14,4 +14,16 @@ package mqclient
type MessageID interface {
// Serialize the message id into a sequence of bytes that can be stored somewhere else
Serialize() []byte
// Get the message ledgerID
LedgerID() int64
// Get the message entryID
EntryID() int64
// Get the message batchIdx
BatchIdx() int32
// Get the message partitionIdx
PartitionIdx() int32
}

View File

@ -21,10 +21,30 @@ type pulsarID struct {
messageID pulsar.MessageID
}
// Check if pulsarID implements pulsar.MessageID and MessageID interface
var _ pulsar.MessageID = &pulsarID{}
var _ MessageID = &pulsarID{}
func (pid *pulsarID) Serialize() []byte {
return pid.messageID.Serialize()
}
func (pid *pulsarID) LedgerID() int64 {
return pid.messageID.LedgerID()
}
func (pid *pulsarID) EntryID() int64 {
return pid.messageID.EntryID()
}
func (pid *pulsarID) BatchIdx() int32 {
return pid.messageID.BatchIdx()
}
func (pid *pulsarID) PartitionIdx() int32 {
return pid.messageID.PartitionIdx()
}
func SerializePulsarMsgID(messageID pulsar.MessageID) []byte {
return messageID.Serialize()
}

View File

@ -27,6 +27,11 @@ func TestPulsarID_Serialize(t *testing.T) {
binary := pid.Serialize()
assert.NotNil(t, binary)
assert.NotZero(t, len(binary))
pid.LedgerID()
pid.EntryID()
pid.BatchIdx()
pid.PartitionIdx()
}
func Test_SerializePulsarMsgID(t *testing.T) {

View File

@ -21,10 +21,33 @@ type rmqID struct {
messageID rocksmq.UniqueID
}
// Check if rmqID implements MessageID interface
var _ MessageID = &rmqID{}
func (rid *rmqID) Serialize() []byte {
return SerializeRmqID(rid.messageID)
}
func (rid *rmqID) LedgerID() int64 {
// TODO
return 0
}
func (rid *rmqID) EntryID() int64 {
// TODO
return 0
}
func (rid *rmqID) BatchIdx() int32 {
// TODO
return 0
}
func (rid *rmqID) PartitionIdx() int32 {
// TODO
return 0
}
func SerializeRmqID(messageID int64) []byte {
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(messageID))

View File

@ -25,6 +25,11 @@ func TestRmqID_Serialize(t *testing.T) {
bin := rid.Serialize()
assert.NotNil(t, bin)
assert.NotZero(t, len(bin))
rid.LedgerID()
rid.EntryID()
rid.BatchIdx()
rid.PartitionIdx()
}
func Test_SerializeRmqID(t *testing.T) {