mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 04:19:18 +08:00
Fix load segment hangs forever (#18814)
Caused if the context is timeout Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
c91bb0b015
commit
63e08606e2
@ -50,6 +50,10 @@ const (
|
||||
requestConcurrencyLevelLimit = 8
|
||||
)
|
||||
|
||||
var (
|
||||
ErrReadDeltaMsgFailed = errors.New("ReadDeltaMsgFailed")
|
||||
)
|
||||
|
||||
// segmentLoader is only responsible for loading the field data from binlog
|
||||
type segmentLoader struct {
|
||||
metaReplica ReplicaInterface
|
||||
@ -689,11 +693,15 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
|
||||
for hasMore {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break
|
||||
return ctx.Err()
|
||||
case msgPack, ok := <-stream.Chan():
|
||||
if !ok {
|
||||
log.Warn("fail to read delta msg", zap.String("pChannelName", pChannelName), zap.Any("msg id", position.GetMsgID()), zap.Error(err))
|
||||
return err
|
||||
log.Warn("fail to read delta msg",
|
||||
zap.String("pChannelName", pChannelName),
|
||||
zap.ByteString("msgID", position.GetMsgID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
return fmt.Errorf("%w: pChannelName=%v, msgID=%v", ErrReadDeltaMsgFailed, pChannelName, position.GetMsgID())
|
||||
}
|
||||
|
||||
if msgPack == nil {
|
||||
|
@ -22,12 +22,14 @@ import (
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
@ -598,7 +600,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
|
||||
mockMsg := &mockMsgID{}
|
||||
mockMsg.On("AtEarliestPosition").Return(true, nil)
|
||||
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
|
||||
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
|
||||
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg))
|
||||
}
|
||||
|
||||
// test already reach latest position
|
||||
@ -606,7 +608,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
|
||||
mockMsg := &mockMsgID{}
|
||||
mockMsg.On("AtEarliestPosition").Return(false, nil)
|
||||
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(true, nil)
|
||||
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
|
||||
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg))
|
||||
}
|
||||
|
||||
//test consume after seeking when get last msg successfully
|
||||
@ -615,7 +617,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
|
||||
mockMsg.On("AtEarliestPosition").Return(false, nil)
|
||||
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
|
||||
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, nil)
|
||||
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
|
||||
assert.Nil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg))
|
||||
}
|
||||
|
||||
//test compare msgID failed when get last msg successfully
|
||||
@ -624,7 +626,7 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
|
||||
mockMsg.On("AtEarliestPosition").Return(false, nil)
|
||||
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
|
||||
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New(""))
|
||||
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, mockMsg))
|
||||
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, true, true, mockMsg))
|
||||
}
|
||||
|
||||
//test consume after seeking when get last msg failed
|
||||
@ -633,7 +635,18 @@ func TestSegmentLoader_testFromDmlCPLoadDelete(t *testing.T) {
|
||||
mockMsg.On("AtEarliestPosition").Return(false, nil)
|
||||
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
|
||||
mockMsg.On("LessOrEqualThan", mock.AnythingOfType("string")).Return(true, errors.New(""))
|
||||
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, mockMsg))
|
||||
assert.NotNil(t, testConsumingDeltaMsg(ctx, t, position, false, true, mockMsg))
|
||||
}
|
||||
|
||||
//test context timeout when reading stream
|
||||
{
|
||||
log.Debug("test context timeout when reading stream")
|
||||
mockMsg := &mockMsgID{}
|
||||
mockMsg.On("AtEarliestPosition").Return(false, nil)
|
||||
mockMsg.On("Equal", mock.AnythingOfType("string")).Return(false, nil)
|
||||
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(-time.Second))
|
||||
defer cancel()
|
||||
assert.ErrorIs(t, testConsumingDeltaMsg(ctx, t, position, true, false, mockMsg), context.DeadlineExceeded)
|
||||
}
|
||||
}
|
||||
|
||||
@ -655,7 +668,7 @@ func testSeekFailWhenConsumingDeltaMsg(ctx context.Context, t *testing.T, positi
|
||||
assert.EqualError(t, ret, errMsg)
|
||||
}
|
||||
|
||||
func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc bool, mockMsg *mockMsgID) error {
|
||||
func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstream.MsgPosition, getLastSucc bool, hasData bool, mockMsg *mockMsgID) error {
|
||||
msgStream := &LoadDeleteMsgStream{}
|
||||
msgStream.On("AsConsumer", mock.AnythingOfTypeArgument("string"), mock.AnythingOfTypeArgument("string"))
|
||||
msgStream.On("Seek", mock.AnythingOfType("string")).Return(nil)
|
||||
@ -666,13 +679,13 @@ func testConsumingDeltaMsg(ctx context.Context, t *testing.T, position *msgstrea
|
||||
msgStream.On("GetLatestMsgID", mock.AnythingOfType("string")).Return(mockMsg, errors.New(""))
|
||||
}
|
||||
|
||||
msgChan := make(chan *msgstream.MsgPack)
|
||||
go func() {
|
||||
msgChan := make(chan *msgstream.MsgPack, 10)
|
||||
if hasData {
|
||||
msgChan <- nil
|
||||
deleteMsg1 := genDeleteMsg(defaultCollectionID+1, schemapb.DataType_Int64, defaultDelLength)
|
||||
deleteMsg2 := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
|
||||
msgChan <- &msgstream.MsgPack{Msgs: []msgstream.TsMsg{deleteMsg1, deleteMsg2}}
|
||||
}()
|
||||
}
|
||||
|
||||
msgStream.On("Chan").Return(msgChan)
|
||||
factory := &mockMsgStreamFactory{mockMqStream: msgStream}
|
||||
|
Loading…
Reference in New Issue
Block a user