diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 2e323ad01f..6ebf11064b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -666,6 +666,7 @@ func (s *Server) GetVChanPositions(vchans []vchannel, seekFromStartPosition bool } } + // use collection start position when segment position is not found if seekPosition == nil { coll := s.meta.GetCollection(vchan.CollectionID) if coll != nil { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 65dfe0e277..78fa589daf 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1038,7 +1038,24 @@ func TestGetVChannelPos(t *testing.T) { svr.meta.AddCollection(&datapb.CollectionInfo{ ID: 0, Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch1", + Data: []byte{8, 9, 10}, + }, + }, }) + svr.meta.AddCollection(&datapb.CollectionInfo{ + ID: 1, + Schema: schema, + StartPositions: []*commonpb.KeyDataPair{ + { + Key: "ch0", + Data: []byte{8, 9, 10}, + }, + }, + }) + s1 := &datapb.SegmentInfo{ ID: 1, CollectionID: 0, @@ -1102,6 +1119,21 @@ func TestGetVChannelPos(t *testing.T) { assert.EqualValues(t, 2, pair[0].UnflushedSegments[0].ID) assert.EqualValues(t, []byte{1, 2, 3}, pair[0].UnflushedSegments[0].DmlPosition.MsgID) }) + + t.Run("empty collection", func(t *testing.T) { + infos, err := svr.GetVChanPositions([]vchannel{ + { + CollectionID: 1, + DmlChannel: "ch0_suffix", + }, + }, true) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(infos)) + assert.EqualValues(t, 1, infos[0].CollectionID) + assert.EqualValues(t, 0, len(infos[0].FlushedSegments)) + assert.EqualValues(t, 0, len(infos[0].UnflushedSegments)) + assert.EqualValues(t, []byte{8, 9, 10}, infos[0].SeekPosition.MsgID) + }) } func TestGetRecoveryInfo(t *testing.T) {