diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 350c48508e..7c344b4ed3 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -696,12 +696,16 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection return ctx.Err() case msgPack, ok := <-stream.Chan(): if !ok { + err = fmt.Errorf("%w: pChannelName=%v, msgID=%v", + ErrReadDeltaMsgFailed, + pChannelName, + position.GetMsgID()) log.Warn("fail to read delta msg", zap.String("pChannelName", pChannelName), zap.ByteString("msgID", position.GetMsgID()), zap.Error(err), ) - return fmt.Errorf("%w: pChannelName=%v, msgID=%v", ErrReadDeltaMsgFailed, pChannelName, position.GetMsgID()) + return err } if msgPack == nil { diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index e55d074ac0..3de5b1940f 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -600,7 +600,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg := &mockMsgID{} mockMsg.On("AtEarliestPosition").Return(true, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) - assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg)) + assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, false, mockMsg)) } // test already reach latest position @@ -608,7 +608,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg := &mockMsgID{} mockMsg.On("AtEarliestPosition").Return(false, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(true, nil) - assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg)) + assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, false, mockMsg)) } //test consume after seeking when get last msg successfully @@ -617,7 +617,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg.On("AtEarliestPosition").Return(false, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, nil) - assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg)) + assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, false, mockMsg)) } //test compare msgID failed when get last msg successfully @@ -626,7 +626,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg.On("AtEarliestPosition").Return(false, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New("")) - assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg)) + assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, true, false, mockMsg)) } //test consume after seeking when get last msg failed @@ -635,7 +635,15 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg.On("AtEarliestPosition").Return(false, nil) mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New("")) - assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, true, mockMsg)) + assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, true, false, mockMsg)) + } + + //test consume after seeking when read stream failed + { + mockMsg := &mockMsgID{} + mockMsg.On("AtEarliestPosition").Return(false, nil) + mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) + assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, false, true, mockMsg)) } //test context timeout when reading stream @@ -646,7 +654,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) { mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil) ctx, cancel := context.WithDeadline(ctx, time.Now().Add(-time.Second)) defer cancel() - assert.ErrorIs(t, testConsumingDeltaMsg(ctx, t, position, true, false, mockMsg), context.DeadlineExceeded) + assert.ErrorIs(t, testConsumingDeltaMsg(ctx, t, position, true, false, false, mockMsg), context.DeadlineExceeded) } } @@ -668,7 +676,7 @@ func testSeekFailWhenConsumingDeltaMsg(ctx context.Context, t *testing.T, positi assert.EqualError(t, ret, errMsg) } -func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc bool, hasData bool, mockMsg *mockMsgID) error { +func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc, hasData, closedStream bool, mockMsg *mockMsgID) error { msgStream := &LoadDeleteMsgStream{} msgStream.On("AsConsumer", mock.AnythingOfTypeArgument("string"), mock.AnythingOfTypeArgument("string")) msgStream.On("Seek", mock.AnythingOfType("string")).Return(nil) @@ -686,6 +694,9 @@ func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstrea deleteMsg2 := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}} } + if closedStream { + close(msgChan) + } msgStream.On("Chan").Return(msgChan) factory := &mockMsgStreamFactory{mockMqStream: msgStream}