Support data segment channel seek to the break point when system start (#5540)

* add DecodeDdOperation

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* reload MsgPosition from etcd and seek when system start

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update testcase

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add unittest

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename const string

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix unittest

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* optimize master unittest

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add msg into dataSegmentChannel to test seek

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix seek test

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* roll back distributed/masterservice/masterservice_test.go

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-06-02 22:36:41 +08:00 committed by GitHub
parent 176109065e
commit 4018dfc5b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 139 additions and 85 deletions

View File

@ -519,6 +519,31 @@ func (c *Core) setDdMsgSendFlag(b bool) error {
return err
}
func (c *Core) startMsgStreamAndSeek(chanName string, subName string, key string) (*ms.MsgStream, error) {
stream, err := c.msFactory.NewMsgStream(c.ctx)
if err != nil {
return nil, err
}
stream.AsConsumer([]string{chanName}, subName)
log.Debug("AsConsumer: " + chanName + ":" + subName)
msgPosStr, err := c.MetaTable.client.Load(key, 0)
if err == nil {
msgPositions := make([]*ms.MsgPosition, 0)
if err := DecodeMsgPositions(msgPosStr, &msgPositions); err != nil {
return nil, fmt.Errorf("decode msg positions fail, err %s", err.Error())
}
if len(msgPositions) > 0 {
if err := stream.Seek(msgPositions); err != nil {
return nil, fmt.Errorf("msg stream seek fail, err %s", err.Error())
}
log.Debug("msg stream: " + chanName + ":" + subName + " seek to stored position")
}
}
stream.Start()
return &stream, nil
}
func (c *Core) setMsgStreams() error {
if Params.PulsarAddress == "" {
return fmt.Errorf("PulsarAddress is empty")
@ -689,20 +714,22 @@ func (c *Core) setMsgStreams() error {
}
// data service will put msg into this channel when create segment
dsStream, _ := c.msFactory.NewMsgStream(c.ctx)
dsChanName := Params.DataServiceSegmentChannel
dsSubName := Params.MsgChannelSubName + "ds"
dsStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dsSubName)
log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + dsSubName)
dsStream.Start()
c.DataServiceSegmentChan = dsStream.Chan()
dsStream, err := c.startMsgStreamAndSeek(dsChanName, dsSubName, SegInfoMsgEndPosPrefix)
if err != nil {
return err
}
c.DataServiceSegmentChan = (*dsStream).Chan()
// data node will put msg into this channel when flush segment
dnStream, _ := c.msFactory.NewMsgStream(c.ctx)
dnChanName := Params.DataServiceSegmentChannel
dnSubName := Params.MsgChannelSubName + "dn"
dnStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dnSubName)
log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + dnSubName)
dnStream.Start()
c.DataNodeFlushedSegmentChan = dnStream.Chan()
dnStream, err := c.startMsgStreamAndSeek(dnChanName, dnSubName, FlushedSegMsgEndPosPrefix)
if err != nil {
return err
}
c.DataNodeFlushedSegmentChan = (*dnStream).Chan()
return nil
}

View File

@ -199,11 +199,58 @@ func consumeMsgChan(timeout time.Duration, targetChan <-chan *msgstream.MsgPack)
}
}
func GenSegInfoMsgPack(seg *datapb.SegmentInfo) *msgstream.MsgPack {
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
}
segMsg := &msgstream.SegmentInfoMsg{
BaseMsg: baseMsg,
SegmentMsg: datapb.SegmentMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
Segment: seg,
},
}
msgPack.Msgs = append(msgPack.Msgs, segMsg)
return &msgPack
}
func GenFlushedSegMsgPack(segID typeutil.UniqueID) *msgstream.MsgPack {
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
}
segMsg := &msgstream.FlushCompletedMsg{
BaseMsg: baseMsg,
SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentFlushDone,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
SegmentID: segID,
},
}
msgPack.Msgs = append(msgPack.Msgs, segMsg)
return &msgPack
}
func TestMasterService(t *testing.T) {
const (
dbName = "testDb"
collName = "testColl"
partName = "testPartition"
segID = 1001
)
ctx, cancel := context.WithCancel(context.Background())
@ -278,15 +325,9 @@ func TestMasterService(t *testing.T) {
err = core.SetQueryService(qm)
assert.Nil(t, err)
err = core.Init()
assert.Nil(t, err)
err = core.Start()
assert.Nil(t, err)
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err = msFactory.SetParams(m)
assert.Nil(t, err)
@ -305,6 +346,25 @@ func TestMasterService(t *testing.T) {
ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName)
ddStream.Start()
// test dataServiceSegmentStream seek
dataNodeSubName := Params.MsgChannelSubName + "dn"
flushedSegStream, _ := msFactory.NewMsgStream(ctx)
flushedSegStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, dataNodeSubName)
flushedSegStream.Start()
msgPack := GenFlushedSegMsgPack(9999)
err = dataServiceSegmentStream.Produce(msgPack)
assert.Nil(t, err)
flushedSegMsgPack := flushedSegStream.Consume()
flushedSegPosStr, _ := EncodeMsgPositions(flushedSegMsgPack.EndPositions)
_, err = etcdCli.Put(ctx, path.Join(Params.MetaRootPath, FlushedSegMsgEndPosPrefix), flushedSegPosStr)
assert.Nil(t, err)
err = core.Init()
assert.Nil(t, err)
err = core.Start()
assert.Nil(t, err)
time.Sleep(time.Second)
t.Run("time tick", func(t *testing.T) {
@ -458,7 +518,7 @@ func TestMasterService(t *testing.T) {
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
err = DecodeDdOperation(ddOpStr, &ddOp)
assert.Nil(t, err)
assert.Equal(t, CreateCollectionDDType, ddOp.Type)
@ -602,7 +662,7 @@ func TestMasterService(t *testing.T) {
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
err = DecodeDdOperation(ddOpStr, &ddOp)
assert.Nil(t, err)
assert.Equal(t, CreatePartitionDDType, ddOp.Type)
@ -665,27 +725,8 @@ func TestMasterService(t *testing.T) {
CollectionID: coll.ID,
PartitionID: part.PartitionID,
}
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
}
segMsg := &msgstream.SegmentInfoMsg{
BaseMsg: baseMsg,
SegmentMsg: datapb.SegmentMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
Segment: seg,
},
}
msgPack.Msgs = append(msgPack.Msgs, segMsg)
err = dataServiceSegmentStream.Broadcast(&msgPack)
segInfoMsgPack := GenSegInfoMsgPack(seg)
err = dataServiceSegmentStream.Broadcast(segInfoMsgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
@ -821,31 +862,12 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, 1, len(part.SegmentIDs))
seg := &datapb.SegmentInfo{
ID: 1001,
ID: segID,
CollectionID: coll.ID,
PartitionID: part.PartitionID,
}
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
}
segMsg := &msgstream.SegmentInfoMsg{
BaseMsg: baseMsg,
SegmentMsg: datapb.SegmentMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
Segment: seg,
},
}
msgPack.Msgs = append(msgPack.Msgs, segMsg)
err = dataServiceSegmentStream.Broadcast(&msgPack)
segInfoMsgPack := GenSegInfoMsgPack(seg)
err = dataServiceSegmentStream.Broadcast(segInfoMsgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
@ -853,20 +875,8 @@ func TestMasterService(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 2, len(part.SegmentIDs))
flushMsg := &msgstream.FlushCompletedMsg{
BaseMsg: baseMsg,
SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentFlushDone,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
SegmentID: 1001,
},
}
msgPack.Msgs = []msgstream.TsMsg{flushMsg}
err = dataServiceSegmentStream.Broadcast(&msgPack)
flushedSegMsgPack := GenFlushedSegMsgPack(segID)
err = dataServiceSegmentStream.Broadcast(flushedSegMsgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
@ -1007,7 +1017,7 @@ func TestMasterService(t *testing.T) {
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
err = DecodeDdOperation(ddOpStr, &ddOp)
assert.Nil(t, err)
assert.Equal(t, DropPartitionDDType, ddOp.Type)
@ -1081,7 +1091,7 @@ func TestMasterService(t *testing.T) {
ddOpStr, err := core.MetaTable.client.Load(DDOperationPrefix, 0)
assert.Nil(t, err)
var ddOp DdOperation
err = json.Unmarshal([]byte(ddOpStr), &ddOp)
err = DecodeDdOperation(ddOpStr, &ddOp)
assert.Nil(t, err)
assert.Equal(t, DropCollectionDDType, ddOp.Type)

View File

@ -41,8 +41,10 @@ const (
TimestampPrefix = ComponentPrefix + "/timestamp"
MsgStartPositionPrefix = ComponentPrefix + "/msg-start-position"
MsgEndPositionPrefix = ComponentPrefix + "/msg-end-position"
SegInfoMsgStartPosPrefix = ComponentPrefix + "/seg-info-msg-start-position"
SegInfoMsgEndPosPrefix = ComponentPrefix + "/seg-info-msg-end-position"
FlushedSegMsgStartPosPrefix = ComponentPrefix + "/flushed-seg-msg-start-position"
FlushedSegMsgEndPosPrefix = ComponentPrefix + "/flushed-seg-msg-end-position"
DDOperationPrefix = ComponentPrefix + "/dd-operation"
DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"
@ -735,9 +737,10 @@ func (mt *metaTable) AddSegment(segInfos []*datapb.SegmentInfo, msgStartPos stri
meta[k] = v
}
// AddSegment is invoked from DataService
if msgStartPos != "" && msgEndPos != "" {
meta[MsgStartPositionPrefix] = msgStartPos
meta[MsgEndPositionPrefix] = msgEndPos
meta[SegInfoMsgStartPosPrefix] = msgStartPos
meta[SegInfoMsgEndPosPrefix] = msgEndPos
}
ts, err := mt.client.MultiSave(meta, nil)
@ -803,9 +806,10 @@ func (mt *metaTable) AddIndex(segIdxInfos []*pb.SegmentIndexInfo, msgStartPos st
meta[k] = v
}
// AddIndex is invoked from DataNode flush operation
if msgStartPos != "" && msgEndPos != "" {
meta[MsgStartPositionPrefix] = msgStartPos
meta[MsgEndPositionPrefix] = msgEndPos
meta[FlushedSegMsgStartPosPrefix] = msgStartPos
meta[FlushedSegMsgEndPosPrefix] = msgEndPos
}
ts, err := mt.client.MultiSave(meta, nil)

View File

@ -86,7 +86,12 @@ func EncodeDdOperation(m proto.Message, m1 proto.Message, ddType string) (string
return string(ddOpByte), nil
}
// SegmentIndexInfoEqual return true if 2 SegmentIndexInfo are identical
// DecodeDdOperation deserialize string to DdOperation
func DecodeDdOperation(str string, ddOp *DdOperation) error {
return json.Unmarshal([]byte(str), ddOp)
}
// SegmentIndexInfoEqual return true if SegmentIndexInfos are identical
func SegmentIndexInfoEqual(info1 *etcdpb.SegmentIndexInfo, info2 *etcdpb.SegmentIndexInfo) bool {
return info1.SegmentID == info2.SegmentID &&
info1.FieldID == info2.FieldID &&
@ -106,3 +111,11 @@ func EncodeMsgPositions(msgPositions []*msgstream.MsgPosition) (string, error) {
}
return string(resByte), nil
}
// DecodeMsgPositions deserialize string to []*MsgPosition
func DecodeMsgPositions(str string, msgPositions *[]*msgstream.MsgPosition) error {
if str == "" || str == "null" {
return nil
}
return json.Unmarshal([]byte(str), msgPositions)
}