Filter with partition id in GetVChanPosition (#11746)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-11-13 08:45:09 +08:00 committed by GitHub
parent 71e814f796
commit d38043f235
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 11 deletions

View File

@ -250,7 +250,7 @@ func (c *ChannelManager) Watch(ch *channel) error {
func (c *ChannelManager) fillChannelPosition(update *ChannelOp) {
for _, ch := range update.Channels {
vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, false)
vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID, false)
info := &datapb.ChannelWatchInfo{
Vchan: vchan,
StartTs: time.Now().Unix(),

View File

@ -22,7 +22,7 @@ import (
// positionProvider provides vchannel pair related position pairs
type positionProvider interface {
GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo
GetVChanPositions(channel string, collectionID UniqueID, paritionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo
}
var _ positionProvider = (*dummyPosProvider)(nil)
@ -30,7 +30,7 @@ var _ positionProvider = (*dummyPosProvider)(nil)
type dummyPosProvider struct{}
//GetVChanPositions implements positionProvider
func (dp dummyPosProvider) GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
func (dp dummyPosProvider) GetVChanPositions(channel string, collectionID UniqueID, paritionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,

View File

@ -50,7 +50,10 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
const connEtcdMaxRetryTime = 100000
const (
connEtcdMaxRetryTime = 100000
allPartitionID = 0 // paritionID means no filtering
)
var (
// TODO: sunby put to config
@ -781,7 +784,7 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
}
// GetVChanPositions get vchannel latest postitions with provided dml channel names
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
segments := s.meta.GetSegmentsByChannel(channel)
log.Debug("GetSegmentsByChannel",
zap.Any("collectionID", collectionID),
@ -793,6 +796,11 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr
unflushed := make([]*datapb.SegmentInfo, 0)
var seekPosition *internalpb.MsgPosition
for _, s := range segments {
// filter segment with parition id
if partitionID > allPartitionID && s.PartitionID != partitionID {
continue
}
if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
if seekPosition == nil || (s.DmlPosition.Timestamp < seekPosition.Timestamp) {

View File

@ -1131,6 +1131,12 @@ func TestGetVChannelPos(t *testing.T) {
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
}
err := svr.meta.AddSegment(NewSegmentInfo(s1))
assert.Nil(t, err)
@ -1140,6 +1146,11 @@ func TestGetVChannelPos(t *testing.T) {
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
@ -1152,35 +1163,54 @@ func TestGetVChannelPos(t *testing.T) {
s3 := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 0,
},
}
err = svr.meta.AddSegment(NewSegmentInfo(s3))
assert.Nil(t, err)
t.Run("get unexisted channel", func(t *testing.T) {
vchan := svr.GetVChanPositions("chx1", 0, true)
vchan := svr.GetVChanPositions("chx1", 0, allPartitionID, true)
assert.Empty(t, vchan.UnflushedSegments)
assert.Empty(t, vchan.FlushedSegments)
})
t.Run("get existed channel", func(t *testing.T) {
vchan := svr.GetVChanPositions("ch1", 0, true)
vchan := svr.GetVChanPositions("ch1", 0, allPartitionID, true)
assert.EqualValues(t, 1, len(vchan.FlushedSegments))
assert.EqualValues(t, 1, vchan.FlushedSegments[0].ID)
assert.EqualValues(t, 1, len(vchan.UnflushedSegments))
assert.EqualValues(t, 2, len(vchan.UnflushedSegments))
assert.EqualValues(t, 2, vchan.UnflushedSegments[0].ID)
assert.EqualValues(t, []byte{1, 2, 3}, vchan.UnflushedSegments[0].DmlPosition.MsgID)
})
t.Run("empty collection", func(t *testing.T) {
infos := svr.GetVChanPositions("ch0_suffix", 1, true)
infos := svr.GetVChanPositions("ch0_suffix", 1, allPartitionID, true)
assert.EqualValues(t, 1, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegments))
assert.EqualValues(t, 0, len(infos.UnflushedSegments))
assert.EqualValues(t, []byte{8, 9, 10}, infos.SeekPosition.MsgID)
})
t.Run("filter partition", func(t *testing.T) {
infos := svr.GetVChanPositions("ch1", 0, 1, true)
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegments))
assert.EqualValues(t, 1, len(infos.UnflushedSegments))
assert.EqualValues(t, []byte{8, 9, 10}, infos.SeekPosition.MsgID)
})
}
func TestGetRecoveryInfo(t *testing.T) {

View File

@ -520,7 +520,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
channels := dresp.GetVirtualChannelNames()
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
for _, c := range channels {
channelInfo := s.GetVChanPositions(c, collectionID, false)
channelInfo := s.GetVChanPositions(c, collectionID, partitionID, false)
channelInfos = append(channelInfos, channelInfo)
log.Debug("datacoord append channelInfo in GetRecoveryInfo",
zap.Any("collectionID", collectionID),