Add unit test for case of stream closed (#18848)

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2022-08-26 19:20:55 +08:00 committed by GitHub
parent 2ff8a49929
commit d76d6f42cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 8 deletions

View File

@ -696,12 +696,16 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
return ctx.Err()
case msgPack, ok := <-stream.Chan():
if !ok {
err = fmt.Errorf("%w: pChannelName=%v, msgID=%v",
ErrReadDeltaMsgFailed,
pChannelName,
position.GetMsgID())
log.Warn("fail to read delta msg",
zap.String("pChannelName", pChannelName),
zap.ByteString("msgID", position.GetMsgID()),
zap.Error(err),
)
return fmt.Errorf("%w: pChannelName=%v, msgID=%v", ErrReadDeltaMsgFailed, pChannelName, position.GetMsgID())
return err
}
if msgPack == nil {

View File

@ -600,7 +600,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
mockMsg := &mockMsgID{}
mockMsg.On("AtEarliestPosition").Return(true, nil)
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg))
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, false, mockMsg))
}
// test already reach latest position
@ -608,7 +608,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
mockMsg := &mockMsgID{}
mockMsg.On("AtEarliestPosition").Return(false, nil)
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(true, nil)
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg))
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, false, mockMsg))
}
//test consume after seeking when get last msg successfully
@ -617,7 +617,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
mockMsg.On("AtEarliestPosition").Return(false, nil)
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, nil)
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg))
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, false, mockMsg))
}
//test compare msgID failed when get last msg successfully
@ -626,7 +626,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
mockMsg.On("AtEarliestPosition").Return(false, nil)
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New(""))
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg))
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, true, false, mockMsg))
}
//test consume after seeking when get last msg failed
@ -635,7 +635,15 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
mockMsg.On("AtEarliestPosition").Return(false, nil)
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New(""))
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, true, mockMsg))
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, true, false, mockMsg))
}
//test consume after seeking when read stream failed
{
mockMsg := &mockMsgID{}
mockMsg.On("AtEarliestPosition").Return(false, nil)
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, false, true, mockMsg))
}
//test context timeout when reading stream
@ -646,7 +654,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(-time.Second))
defer cancel()
assert.ErrorIs(t, testConsumingDeltaMsg(ctx, t, position, true, false, mockMsg), context.DeadlineExceeded)
assert.ErrorIs(t, testConsumingDeltaMsg(ctx, t, position, true, false, false, mockMsg), context.DeadlineExceeded)
}
}
@ -668,7 +676,7 @@ func testSeekFailWhenConsumingDeltaMsg(ctx context.Context, t *testing.T, positi
assert.EqualError(t, ret, errMsg)
}
func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc bool, hasData bool, mockMsg *mockMsgID) error {
func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc, hasData, closedStream bool, mockMsg *mockMsgID) error {
msgStream := &LoadDeleteMsgStream{}
msgStream.On("AsConsumer", mock.AnythingOfTypeArgument("string"), mock.AnythingOfTypeArgument("string"))
msgStream.On("Seek", mock.AnythingOfType("string")).Return(nil)
@ -686,6 +694,9 @@ func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstrea
deleteMsg2 := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}}
}
if closedStream {
close(msgChan)
}
msgStream.On("Chan").Return(msgChan)
factory := &mockMsgStreamFactory{mockMqStream: msgStream}