enhance: add create segment message, enable empty segment flush (#37407)

issue: #37172

- add redo interceptor to implement append context refresh. (make new
timetick)
- add create segment handler for flusher.
- make empty segment flushable and directly change it into dropped.
- add create segment message into wal when creating new growing segment.
- make the insert operation into following seq: createSegment -> insert
-> insert -> flushSegment.
- make manual flush into following seq: flushTs -> flushsegment ->
flushsegment -> manualflush.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2024-11-08 10:16:34 +08:00 committed by GitHub
parent 81879425e1
commit 49657c4690
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
55 changed files with 948 additions and 180 deletions

2
go.mod
View File

@ -18,7 +18,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/validator/v10 v10.14.0
github.com/gofrs/flock v0.8.1
github.com/golang/protobuf v1.5.4
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9

View File

@ -33,6 +33,7 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
FlushMsgHandler:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
@ -44,11 +45,11 @@ packages:
Interceptor:
InterceptorWithReady:
InterceptorBuilder:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
SealOperator:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
TimeTickSyncOperator:
google.golang.org/grpc:
interfaces:

View File

@ -1013,6 +1013,24 @@ func UpdateIsImporting(segmentID int64, isImporting bool) UpdateOperator {
}
}
// UpdateAsDroppedIfEmptyWhenFlushing updates segment state to Dropped if segment is empty and in Flushing state
// It's used to make a empty flushing segment to be dropped directly.
func UpdateAsDroppedIfEmptyWhenFlushing(segmentID int64) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Warn("meta update: update as dropped if empty when flusing failed - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
if segment.GetNumOfRows() == 0 && segment.GetState() == commonpb.SegmentState_Flushing {
log.Info("meta update: update as dropped if empty when flusing", zap.Int64("segmentID", segmentID))
updateSegStateAndPrepareMetrics(segment, commonpb.SegmentState_Dropped, modPack.metricMutation)
}
return true
}
}
// updateSegmentsInfo update segment infos
// will exec all operators, and update all changed segments
func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error {

View File

@ -849,6 +849,20 @@ func TestUpdateSegmentsInfo(t *testing.T) {
UpdateIsImporting(1, true),
)
assert.NoError(t, err)
err = meta.UpdateSegmentsInfo(UpdateAsDroppedIfEmptyWhenFlushing(1))
assert.NoError(t, err)
})
t.Run("update empty segment into flush", func(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)
meta.AddSegment(context.Background(), &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}})
err = meta.UpdateSegmentsInfo(
UpdateStatusOperator(1, commonpb.SegmentState_Flushing),
UpdateAsDroppedIfEmptyWhenFlushing(1),
)
assert.NoError(t, err)
})
t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) {

View File

@ -553,6 +553,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
AddBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(), req.GetField2Bm25LogPaths()),
UpdateStartPosition(req.GetStartPositions()),
UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints()),
UpdateAsDroppedIfEmptyWhenFlushing(req.GetSegmentID()),
)
// Update segment info in memory and meta.

View File

@ -246,13 +246,19 @@ func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() {
segments := map[int64]commonpb.SegmentState{
0: commonpb.SegmentState_Flushed,
1: commonpb.SegmentState_Sealed,
2: commonpb.SegmentState_Sealed,
}
for segID, state := range segments {
numOfRows := int64(100)
if segID == 2 {
numOfRows = 0
}
info := &datapb.SegmentInfo{
ID: segID,
InsertChannel: "ch1",
State: state,
Level: datapb.SegmentLevel_L1,
NumOfRows: numOfRows,
}
err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info))
s.Require().NoError(err)
@ -263,11 +269,14 @@ func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() {
inSegID int64
inDropped bool
inFlushed bool
numOfRows int64
expectedState commonpb.SegmentState
}{
{"segID=0, flushed to dropped", 0, true, false, commonpb.SegmentState_Dropped},
{"segID=1, sealed to flushing", 1, false, true, commonpb.SegmentState_Flushing},
{"segID=0, flushed to dropped", 0, true, false, 100, commonpb.SegmentState_Dropped},
{"segID=1, sealed to flushing", 1, false, true, 100, commonpb.SegmentState_Flushing},
// empty segment flush should be dropped directly.
{"segID=2, sealed to dropped", 2, false, true, 0, commonpb.SegmentState_Dropped},
}
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "False")
@ -290,7 +299,7 @@ func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() {
segment := s.testServer.meta.GetSegment(test.inSegID)
s.NotNil(segment)
s.EqualValues(0, len(segment.GetBinlogs()))
s.EqualValues(segment.NumOfRows, 0)
s.EqualValues(segment.NumOfRows, test.numOfRows)
flushing := []commonpb.SegmentState{commonpb.SegmentState_Flushed, commonpb.SegmentState_Flushing}
if lo.Contains(flushing, test.expectedState) {
@ -362,6 +371,7 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
segments := map[int64]int64{
0: 0,
1: 0,
2: 0,
}
for segID, collID := range segments {
info := &datapb.SegmentInfo{
@ -446,6 +456,24 @@ func (s *ServerSuite) TestSaveBinlogPath_NormalCase() {
s.EqualValues(segment.DmlPosition.ChannelName, "ch1")
s.EqualValues(segment.DmlPosition.MsgID, []byte{1, 2, 3})
s.EqualValues(segment.NumOfRows, 10)
resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 2,
CollectionID: 0,
Channel: "ch1",
Field2BinlogPaths: []*datapb.FieldBinlog{},
Field2StatslogPaths: []*datapb.FieldBinlog{},
CheckPoints: []*datapb.CheckPoint{},
Flushed: true,
})
s.NoError(err)
s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success)
segment = s.testServer.meta.GetSegment(2)
s.NotNil(segment)
s.Equal(commonpb.SegmentState_Dropped, segment.GetState())
}
func (s *ServerSuite) TestFlush_NormalCase() {

View File

@ -129,6 +129,14 @@ func WithNoSyncingTask() SegmentFilter {
type SegmentAction func(info *SegmentInfo)
func SegmentActions(actions ...SegmentAction) SegmentAction {
return func(info *SegmentInfo) {
for _, act := range actions {
act(info)
}
}
}
func UpdateState(state commonpb.SegmentState) SegmentAction {
return func(info *SegmentInfo) {
info.state = state
@ -147,6 +155,14 @@ func UpdateNumOfRows(numOfRows int64) SegmentAction {
}
}
func SetStartPositionIfNil(startPos *msgpb.MsgPosition) SegmentAction {
return func(info *SegmentInfo) {
if info.startPosition == nil {
info.startPosition = startPos
}
}
}
func UpdateBufferedRows(bufferedRows int64) SegmentAction {
return func(info *SegmentInfo) {
info.bufferRows = bufferedRows

View File

@ -89,6 +89,13 @@ func (s *SegmentActionSuite) TestActions() {
action = UpdateNumOfRows(numOfRows)
action(info)
s.Equal(numOfRows, info.NumOfRows())
info = &SegmentInfo{}
actions := SegmentActions(UpdateState(state), UpdateCheckpoint(cp), UpdateNumOfRows(numOfRows))
actions(info)
s.Equal(state, info.State())
s.Equal(cp, info.Checkpoint())
s.Equal(numOfRows, info.NumOfRows())
}
func (s *SegmentActionSuite) TestMergeActions() {

View File

@ -283,7 +283,7 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
flushed,
unflushed,
params.CompactionExecutor,
params.FlushMsgHandler,
params.MsgHandler,
)
if err != nil {
return nil, err

View File

@ -71,7 +71,7 @@ type ddNode struct {
dropMode atomic.Value
compactionExecutor compaction.Executor
flushMsgHandler flusher.FlushMsgHandler
msgHandler flusher.MsgHandler
// for recovery
growingSegInfo map[typeutil.UniqueID]*datapb.SegmentInfo // segmentID
@ -236,6 +236,19 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Add(float64(dmsg.GetNumRows()))
fgMsg.DeleteMessages = append(fgMsg.DeleteMessages, dmsg)
case commonpb.MsgType_CreateSegment:
createSegment := msg.(*adaptor.CreateSegmentMessageBody)
logger := log.With(
zap.String("vchannel", ddn.Name()),
zap.Int32("msgType", int32(msg.Type())),
zap.Uint64("timetick", createSegment.CreateSegmentMessage.TimeTick()),
)
logger.Info("receive create segment message")
if err := ddn.msgHandler.HandleCreateSegment(context.Background(), ddn.vChannelName, createSegment.CreateSegmentMessage); err != nil {
logger.Warn("handle create segment message failed", zap.Error(err))
} else {
logger.Info("handle create segment message success")
}
case commonpb.MsgType_FlushSegment:
flushMsg := msg.(*adaptor.FlushMessageBody)
logger := log.With(
@ -244,7 +257,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
zap.Uint64("timetick", flushMsg.FlushMessage.TimeTick()),
)
logger.Info("receive flush message")
if err := ddn.flushMsgHandler.HandleFlush(ddn.vChannelName, flushMsg.FlushMessage); err != nil {
if err := ddn.msgHandler.HandleFlush(ddn.vChannelName, flushMsg.FlushMessage); err != nil {
logger.Warn("handle flush message failed", zap.Error(err))
} else {
logger.Info("handle flush message success")
@ -258,7 +271,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
zap.Uint64("flushTs", manualFlushMsg.ManualFlushMessage.Header().FlushTs),
)
logger.Info("receive manual flush message")
if err := ddn.flushMsgHandler.HandleManualFlush(ddn.vChannelName, manualFlushMsg.ManualFlushMessage); err != nil {
if err := ddn.msgHandler.HandleManualFlush(ddn.vChannelName, manualFlushMsg.ManualFlushMessage); err != nil {
logger.Warn("handle manual flush message failed", zap.Error(err))
} else {
logger.Info("handle manual flush message success")
@ -318,7 +331,7 @@ func (ddn *ddNode) Close() {
}
func newDDNode(ctx context.Context, collID typeutil.UniqueID, vChannelName string, droppedSegmentIDs []typeutil.UniqueID,
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, executor compaction.Executor, handler flusher.FlushMsgHandler,
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, executor compaction.Executor, handler flusher.MsgHandler,
) (*ddNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(paramtable.Get().DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
@ -333,7 +346,7 @@ func newDDNode(ctx context.Context, collID typeutil.UniqueID, vChannelName strin
droppedSegmentIDs: droppedSegmentIDs,
vChannelName: vChannelName,
compactionExecutor: executor,
flushMsgHandler: handler,
msgHandler: handler,
}
dd.dropMode.Store(false)

View File

@ -22,14 +22,19 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_flusher"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -92,6 +97,56 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) {
}
}
func TestFlowGraph_DDNode_OperateFlush(t *testing.T) {
h := mock_flusher.NewMockFlushMsgHandler(t)
h.EXPECT().HandleCreateSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil)
h.EXPECT().HandleFlush(mock.Anything, mock.Anything).Return(nil)
h.EXPECT().HandleManualFlush(mock.Anything, mock.Anything).Return(nil)
ddn := ddNode{
ctx: context.Background(),
collectionID: 1,
vChannelName: "v1",
msgHandler: h,
}
mutableMsg, err := message.NewCreateSegmentMessageBuilderV2().
WithHeader(&message.CreateSegmentMessageHeader{}).
WithBody(&message.CreateSegmentMessageBody{}).
WithVChannel("v1").
BuildMutable()
assert.NoError(t, err)
immutableCreateSegmentMsg := mutableMsg.WithTimeTick(1).IntoImmutableMessage(mock_message.NewMockMessageID(t))
flushMsg, err := message.NewFlushMessageBuilderV2().
WithHeader(&message.FlushMessageHeader{}).
WithBody(&message.FlushMessageBody{}).
WithVChannel("v1").
BuildMutable()
assert.NoError(t, err)
immutableFlushMsg := flushMsg.WithTimeTick(2).IntoImmutableMessage(mock_message.NewMockMessageID(t))
manualFlushMsg, err := message.NewManualFlushMessageBuilderV2().
WithHeader(&message.ManualFlushMessageHeader{}).
WithBody(&message.ManualFlushMessageBody{}).
WithVChannel("v1").
BuildMutable()
assert.NoError(t, err)
immutableManualFlushMsg := manualFlushMsg.WithTimeTick(3).IntoImmutableMessage(mock_message.NewMockMessageID(t))
msg1, err := adaptor.NewCreateSegmentMessageBody(immutableCreateSegmentMsg)
assert.NoError(t, err)
msg2, err := adaptor.NewFlushMessageBody(immutableFlushMsg)
assert.NoError(t, err)
msg3, err := adaptor.NewManualFlushMessageBody(immutableManualFlushMsg)
assert.NoError(t, err)
tsMessages := []msgstream.TsMsg{msg1, msg2, msg3}
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil)
outputMsgs := ddn.Operate([]Msg{msgStreamMsg})
assert.NotNil(t, outputMsgs)
}
func TestFlowGraph_DDNode_Operate(t *testing.T) {
t.Run("Test DDNode Operate DropCollection Msg", func(t *testing.T) {
// invalid inputs

View File

@ -248,7 +248,8 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B
return nil, merr.WrapErrSegmentNotFound(pack.segmentID)
}
return s.inCodec.SerializePkStatsList(lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats {
// Allow to flush empty segment to make streaming service easier to implement rollback transaction.
stats := lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats {
return &storage.PrimaryKeyStats{
FieldID: s.pkField.GetFieldID(),
MaxPk: pks.MaxPK,
@ -257,7 +258,11 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B
BF: pks.PkFilter,
PkType: int64(s.pkField.GetDataType()),
}
}), segment.NumOfRows())
})
if len(stats) == 0 {
return nil, nil
}
return s.inCodec.SerializePkStatsList(stats, segment.NumOfRows())
}
func (s *storageV1Serializer) serializeMergedBM25Stats(pack *SyncPack) (map[int64]*storage.Blob, error) {
@ -267,8 +272,9 @@ func (s *storageV1Serializer) serializeMergedBM25Stats(pack *SyncPack) (map[int6
}
stats := segment.GetBM25Stats()
// Allow to flush empty segment to make streaming service easier to implement rollback transaction.
if stats == nil {
return nil, fmt.Errorf("searalize empty bm25 stats")
return nil, nil
}
fieldBytes, numRow, err := stats.Serialize()

View File

@ -49,7 +49,7 @@ type PipelineParams struct {
WriteBufferManager writebuffer.BufferManager
CheckpointUpdater *ChannelCheckpointUpdater
Allocator allocator.Interface
FlushMsgHandler flusher.FlushMsgHandler
MsgHandler flusher.MsgHandler
}
// TimeRange is a range of timestamp contains the min-timestamp and max-timestamp

View File

@ -24,6 +24,8 @@ import (
type BufferManager interface {
// Register adds a WriteBuffer with provided schema & options.
Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error
// CreateNewGrowingSegment notifies writeBuffer to create a new growing segment.
CreateNewGrowingSegment(ctx context.Context, channel string, partition int64, segmentID int64) error
// SealSegments notifies writeBuffer corresponding to provided channel to seal segments.
// which will cause segment start flush procedure.
SealSegments(ctx context.Context, channel string, segmentIDs []int64) error
@ -156,6 +158,22 @@ func (m *bufferManager) Register(channel string, metacache metacache.MetaCache,
return nil
}
// CreateNewGrowingSegment notifies writeBuffer to create a new growing segment.
func (m *bufferManager) CreateNewGrowingSegment(ctx context.Context, channel string, partitionID int64, segmentID int64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
log.Ctx(ctx).Warn("write buffer not found when create new growing segment",
zap.String("channel", channel),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID))
return merr.WrapErrChannelNotFound(channel)
}
buf.CreateNewGrowingSegment(partitionID, segmentID, nil)
return nil
}
// SealSegments call sync segment and change segments state to Flushed.
func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error {
m.mut.RLock()

View File

@ -107,6 +107,25 @@ func (s *ManagerSuite) TestFlushSegments() {
})
}
func (s *ManagerSuite) TestCreateNewGrowingSegment() {
manager := s.manager
err := manager.CreateNewGrowingSegment(context.Background(), s.channelName, 1, 1)
s.Error(err)
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Once()
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return()
wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{
idAllocator: s.allocator,
})
s.NoError(err)
s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()
err = manager.CreateNewGrowingSegment(context.Background(), s.channelName, 1, 1)
s.NoError(err)
}
func (s *ManagerSuite) TestBufferData() {
manager := s.manager
s.Run("channel_not_found", func() {

View File

@ -76,6 +76,55 @@ func (_c *MockBufferManager_BufferData_Call) RunAndReturn(run func(string, []*In
return _c
}
// CreateNewGrowingSegment provides a mock function with given fields: ctx, channel, partition, segmentID
func (_m *MockBufferManager) CreateNewGrowingSegment(ctx context.Context, channel string, partition int64, segmentID int64) error {
ret := _m.Called(ctx, channel, partition, segmentID)
if len(ret) == 0 {
panic("no return value specified for CreateNewGrowingSegment")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, int64, int64) error); ok {
r0 = rf(ctx, channel, partition, segmentID)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockBufferManager_CreateNewGrowingSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateNewGrowingSegment'
type MockBufferManager_CreateNewGrowingSegment_Call struct {
*mock.Call
}
// CreateNewGrowingSegment is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - partition int64
// - segmentID int64
func (_e *MockBufferManager_Expecter) CreateNewGrowingSegment(ctx interface{}, channel interface{}, partition interface{}, segmentID interface{}) *MockBufferManager_CreateNewGrowingSegment_Call {
return &MockBufferManager_CreateNewGrowingSegment_Call{Call: _e.mock.On("CreateNewGrowingSegment", ctx, channel, partition, segmentID)}
}
func (_c *MockBufferManager_CreateNewGrowingSegment_Call) Run(run func(ctx context.Context, channel string, partition int64, segmentID int64)) *MockBufferManager_CreateNewGrowingSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(int64), args[3].(int64))
})
return _c
}
func (_c *MockBufferManager_CreateNewGrowingSegment_Call) Return(_a0 error) *MockBufferManager_CreateNewGrowingSegment_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBufferManager_CreateNewGrowingSegment_Call) RunAndReturn(run func(context.Context, string, int64, int64) error) *MockBufferManager_CreateNewGrowingSegment_Call {
_c.Call.Return(run)
return _c
}
// DropChannel provides a mock function with given fields: channel
func (_m *MockBufferManager) DropChannel(channel string) {
_m.Called(channel)

View File

@ -107,6 +107,41 @@ func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(context.Context, boo
return _c
}
// CreateNewGrowingSegment provides a mock function with given fields: partitionID, segmentID, startPos
func (_m *MockWriteBuffer) CreateNewGrowingSegment(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition) {
_m.Called(partitionID, segmentID, startPos)
}
// MockWriteBuffer_CreateNewGrowingSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateNewGrowingSegment'
type MockWriteBuffer_CreateNewGrowingSegment_Call struct {
*mock.Call
}
// CreateNewGrowingSegment is a helper method to define mock.On call
// - partitionID int64
// - segmentID int64
// - startPos *msgpb.MsgPosition
func (_e *MockWriteBuffer_Expecter) CreateNewGrowingSegment(partitionID interface{}, segmentID interface{}, startPos interface{}) *MockWriteBuffer_CreateNewGrowingSegment_Call {
return &MockWriteBuffer_CreateNewGrowingSegment_Call{Call: _e.mock.On("CreateNewGrowingSegment", partitionID, segmentID, startPos)}
}
func (_c *MockWriteBuffer_CreateNewGrowingSegment_Call) Run(run func(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition)) *MockWriteBuffer_CreateNewGrowingSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64), args[2].(*msgpb.MsgPosition))
})
return _c
}
func (_c *MockWriteBuffer_CreateNewGrowingSegment_Call) Return() *MockWriteBuffer_CreateNewGrowingSegment_Call {
_c.Call.Return()
return _c
}
func (_c *MockWriteBuffer_CreateNewGrowingSegment_Call) RunAndReturn(run func(int64, int64, *msgpb.MsgPosition)) *MockWriteBuffer_CreateNewGrowingSegment_Call {
_c.Call.Return(run)
return _c
}
// DropPartitions provides a mock function with given fields: partitionIDs
func (_m *MockWriteBuffer) DropPartitions(partitionIDs []int64) {
_m.Called(partitionIDs)

View File

@ -37,6 +37,8 @@ const (
type WriteBuffer interface {
// HasSegment checks whether certain segment exists in this buffer.
HasSegment(segmentID int64) bool
// CreateNewGrowingSegment creates a new growing segment in the buffer.
CreateNewGrowingSegment(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition)
// BufferData is the method to buffer dml data msgs.
BufferData(insertMsgs []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error
// FlushTimestamp set flush timestamp for write buffer
@ -520,14 +522,13 @@ func (id *InsertData) batchPkExists(pks []storage.PrimaryKey, tss []uint64, hits
return hits
}
// bufferInsert function InsertMsg into bufferred InsertData and returns primary key field data for future usage.
func (wb *writeBufferBase) bufferInsert(inData *InsertData, startPos, endPos *msgpb.MsgPosition) error {
_, ok := wb.metaCache.GetSegmentByID(inData.segmentID)
func (wb *writeBufferBase) CreateNewGrowingSegment(partitionID int64, segmentID int64, startPos *msgpb.MsgPosition) {
_, ok := wb.metaCache.GetSegmentByID(segmentID)
// new segment
if !ok {
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: inData.segmentID,
PartitionID: inData.partitionID,
ID: segmentID,
PartitionID: partitionID,
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
@ -535,14 +536,20 @@ func (wb *writeBufferBase) bufferInsert(inData *InsertData, startPos, endPos *ms
}, func(_ *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize())
}, metacache.NewBM25StatsFactory, metacache.SetStartPosRecorded(false))
log.Info("add growing segment", zap.Int64("segmentID", inData.segmentID), zap.String("channel", wb.channelName))
log.Info("add growing segment", zap.Int64("segmentID", segmentID), zap.String("channel", wb.channelName))
}
}
// bufferInsert function InsertMsg into bufferred InsertData and returns primary key field data for future usage.
func (wb *writeBufferBase) bufferInsert(inData *InsertData, startPos, endPos *msgpb.MsgPosition) error {
wb.CreateNewGrowingSegment(inData.partitionID, inData.segmentID, startPos)
segBuf := wb.getOrCreateBuffer(inData.segmentID)
totalMemSize := segBuf.insertBuffer.Buffer(inData, startPos, endPos)
wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows),
metacache.WithSegmentIDs(inData.segmentID))
wb.metaCache.UpdateSegments(metacache.SegmentActions(
metacache.UpdateBufferedRows(segBuf.insertBuffer.rows),
metacache.SetStartPositionIfNil(startPos),
), metacache.WithSegmentIDs(inData.segmentID))
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(totalMemSize))

View File

@ -0,0 +1,180 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
package mock_flusher
import (
context "context"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
mock "github.com/stretchr/testify/mock"
)
// MockFlushMsgHandler is an autogenerated mock type for the FlushMsgHandler type
type MockFlushMsgHandler struct {
mock.Mock
}
type MockFlushMsgHandler_Expecter struct {
mock *mock.Mock
}
func (_m *MockFlushMsgHandler) EXPECT() *MockFlushMsgHandler_Expecter {
return &MockFlushMsgHandler_Expecter{mock: &_m.Mock}
}
// HandleCreateSegment provides a mock function with given fields: ctx, vchannel, createSegmentMsg
func (_m *MockFlushMsgHandler) HandleCreateSegment(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2) error {
ret := _m.Called(ctx, vchannel, createSegmentMsg)
if len(ret) == 0 {
panic("no return value specified for HandleCreateSegment")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, message.ImmutableCreateSegmentMessageV2) error); ok {
r0 = rf(ctx, vchannel, createSegmentMsg)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockFlushMsgHandler_HandleCreateSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HandleCreateSegment'
type MockFlushMsgHandler_HandleCreateSegment_Call struct {
*mock.Call
}
// HandleCreateSegment is a helper method to define mock.On call
// - ctx context.Context
// - vchannel string
// - createSegmentMsg message.specializedImmutableMessage[*messagespb.CreateSegmentMessageHeader,*messagespb.CreateSegmentMessageBody]
func (_e *MockFlushMsgHandler_Expecter) HandleCreateSegment(ctx interface{}, vchannel interface{}, createSegmentMsg interface{}) *MockFlushMsgHandler_HandleCreateSegment_Call {
return &MockFlushMsgHandler_HandleCreateSegment_Call{Call: _e.mock.On("HandleCreateSegment", ctx, vchannel, createSegmentMsg)}
}
func (_c *MockFlushMsgHandler_HandleCreateSegment_Call) Run(run func(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2)) *MockFlushMsgHandler_HandleCreateSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(message.ImmutableCreateSegmentMessageV2))
})
return _c
}
func (_c *MockFlushMsgHandler_HandleCreateSegment_Call) Return(_a0 error) *MockFlushMsgHandler_HandleCreateSegment_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlushMsgHandler_HandleCreateSegment_Call) RunAndReturn(run func(context.Context, string, message.ImmutableCreateSegmentMessageV2) error) *MockFlushMsgHandler_HandleCreateSegment_Call {
_c.Call.Return(run)
return _c
}
// HandleFlush provides a mock function with given fields: vchannel, flushMsg
func (_m *MockFlushMsgHandler) HandleFlush(vchannel string, flushMsg message.ImmutableFlushMessageV2) error {
ret := _m.Called(vchannel, flushMsg)
if len(ret) == 0 {
panic("no return value specified for HandleFlush")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, message.ImmutableFlushMessageV2) error); ok {
r0 = rf(vchannel, flushMsg)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockFlushMsgHandler_HandleFlush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HandleFlush'
type MockFlushMsgHandler_HandleFlush_Call struct {
*mock.Call
}
// HandleFlush is a helper method to define mock.On call
// - vchannel string
// - flushMsg message.specializedImmutableMessage[*messagespb.FlushMessageHeader,*messagespb.FlushMessageBody]
func (_e *MockFlushMsgHandler_Expecter) HandleFlush(vchannel interface{}, flushMsg interface{}) *MockFlushMsgHandler_HandleFlush_Call {
return &MockFlushMsgHandler_HandleFlush_Call{Call: _e.mock.On("HandleFlush", vchannel, flushMsg)}
}
func (_c *MockFlushMsgHandler_HandleFlush_Call) Run(run func(vchannel string, flushMsg message.ImmutableFlushMessageV2)) *MockFlushMsgHandler_HandleFlush_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(message.ImmutableFlushMessageV2))
})
return _c
}
func (_c *MockFlushMsgHandler_HandleFlush_Call) Return(_a0 error) *MockFlushMsgHandler_HandleFlush_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlushMsgHandler_HandleFlush_Call) RunAndReturn(run func(string, message.ImmutableFlushMessageV2) error) *MockFlushMsgHandler_HandleFlush_Call {
_c.Call.Return(run)
return _c
}
// HandleManualFlush provides a mock function with given fields: vchannel, flushMsg
func (_m *MockFlushMsgHandler) HandleManualFlush(vchannel string, flushMsg message.ImmutableManualFlushMessageV2) error {
ret := _m.Called(vchannel, flushMsg)
if len(ret) == 0 {
panic("no return value specified for HandleManualFlush")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, message.ImmutableManualFlushMessageV2) error); ok {
r0 = rf(vchannel, flushMsg)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockFlushMsgHandler_HandleManualFlush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HandleManualFlush'
type MockFlushMsgHandler_HandleManualFlush_Call struct {
*mock.Call
}
// HandleManualFlush is a helper method to define mock.On call
// - vchannel string
// - flushMsg message.specializedImmutableMessage[*messagespb.ManualFlushMessageHeader,*messagespb.ManualFlushMessageBody]
func (_e *MockFlushMsgHandler_Expecter) HandleManualFlush(vchannel interface{}, flushMsg interface{}) *MockFlushMsgHandler_HandleManualFlush_Call {
return &MockFlushMsgHandler_HandleManualFlush_Call{Call: _e.mock.On("HandleManualFlush", vchannel, flushMsg)}
}
func (_c *MockFlushMsgHandler_HandleManualFlush_Call) Run(run func(vchannel string, flushMsg message.ImmutableManualFlushMessageV2)) *MockFlushMsgHandler_HandleManualFlush_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(message.ImmutableManualFlushMessageV2))
})
return _c
}
func (_c *MockFlushMsgHandler_HandleManualFlush_Call) Return(_a0 error) *MockFlushMsgHandler_HandleManualFlush_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlushMsgHandler_HandleManualFlush_Call) RunAndReturn(run func(string, message.ImmutableManualFlushMessageV2) error) *MockFlushMsgHandler_HandleManualFlush_Call {
_c.Call.Return(run)
return _c
}
// NewMockFlushMsgHandler creates a new instance of MockFlushMsgHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockFlushMsgHandler(t interface {
mock.TestingT
Cleanup(func())
}) *MockFlushMsgHandler {
mock := &MockFlushMsgHandler{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -97,6 +97,7 @@ func repackInsertDataForStreamingService(
return nil, err
}
for _, msg := range msgs {
insertRequest := msg.(*msgstream.InsertMsg).InsertRequest
newMsg, err := message.NewInsertMessageBuilderV1().
WithVChannel(channel).
WithHeader(&message.InsertMessageHeader{
@ -104,12 +105,12 @@ func repackInsertDataForStreamingService(
Partitions: []*message.PartitionSegmentAssignment{
{
PartitionId: partitionID,
Rows: uint64(len(rowOffsets)),
Rows: insertRequest.GetNumRows(),
BinarySize: 0, // TODO: current not used, message estimate size is used.
},
},
}).
WithBody(msg.(*msgstream.InsertMsg).InsertRequest).
WithBody(insertRequest).
BuildMutable()
if err != nil {
return nil, err
@ -175,6 +176,7 @@ func repackInsertDataWithPartitionKeyForStreamingService(
return nil, err
}
for _, msg := range msgs {
insertRequest := msg.(*msgstream.InsertMsg).InsertRequest
newMsg, err := message.NewInsertMessageBuilderV1().
WithVChannel(channel).
WithHeader(&message.InsertMessageHeader{
@ -182,12 +184,12 @@ func repackInsertDataWithPartitionKeyForStreamingService(
Partitions: []*message.PartitionSegmentAssignment{
{
PartitionId: partitionIDs[partitionName],
Rows: uint64(len(rowOffsets)),
Rows: insertRequest.GetNumRows(),
BinarySize: 0, // TODO: current not used, message estimate size is used.
},
},
}).
WithBody(msg.(*msgstream.InsertMsg).InsertRequest).
WithBody(insertRequest).
BuildMutable()
if err != nil {
return nil, err

View File

@ -20,22 +20,45 @@ import (
"context"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
func newFlushMsgHandler(wbMgr writebuffer.BufferManager) *flushMsgHandlerImpl {
return &flushMsgHandlerImpl{
func newMsgHandler(wbMgr writebuffer.BufferManager) *msgHandlerImpl {
return &msgHandlerImpl{
wbMgr: wbMgr,
}
}
type flushMsgHandlerImpl struct {
type msgHandlerImpl struct {
wbMgr writebuffer.BufferManager
}
func (impl *flushMsgHandlerImpl) HandleFlush(vchannel string, flushMsg message.ImmutableFlushMessageV2) error {
func (impl *msgHandlerImpl) HandleCreateSegment(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2) error {
body, err := createSegmentMsg.Body()
if err != nil {
return errors.Wrap(err, "failed to get create segment message body")
}
for _, segmentInfo := range body.GetSegments() {
if err := impl.wbMgr.CreateNewGrowingSegment(ctx, vchannel, segmentInfo.GetPartitionId(), segmentInfo.GetSegmentId()); err != nil {
log.Warn("fail to create new growing segment",
zap.String("vchannel", vchannel),
zap.Int64("partition_id", segmentInfo.GetPartitionId()),
zap.Int64("segment_id", segmentInfo.GetSegmentId()))
return err
}
log.Info("create new growing segment",
zap.String("vchannel", vchannel),
zap.Int64("partition_id", segmentInfo.GetPartitionId()),
zap.Int64("segment_id", segmentInfo.GetSegmentId()))
}
return nil
}
func (impl *msgHandlerImpl) HandleFlush(vchannel string, flushMsg message.ImmutableFlushMessageV2) error {
body, err := flushMsg.Body()
if err != nil {
return errors.Wrap(err, "failed to get flush message body")
@ -46,7 +69,7 @@ func (impl *flushMsgHandlerImpl) HandleFlush(vchannel string, flushMsg message.I
return nil
}
func (impl *flushMsgHandlerImpl) HandleManualFlush(vchannel string, flushMsg message.ImmutableManualFlushMessageV2) error {
func (impl *msgHandlerImpl) HandleManualFlush(vchannel string, flushMsg message.ImmutableManualFlushMessageV2) error {
if err := impl.wbMgr.FlushChannel(context.Background(), vchannel, flushMsg.Header().GetFlushTs()); err != nil {
return errors.Wrap(err, "failed to flush channel")
}

View File

@ -45,7 +45,7 @@ func TestFlushMsgHandler_HandleFlush(t *testing.T) {
BuildMutable()
assert.NoError(t, err)
handler := newFlushMsgHandler(wbMgr)
handler := newMsgHandler(wbMgr)
msgID := mock_message.NewMockMessageID(t)
im, err := message.AsImmutableFlushMessageV2(msg.IntoImmutableMessage(msgID))
assert.NoError(t, err)
@ -56,7 +56,7 @@ func TestFlushMsgHandler_HandleFlush(t *testing.T) {
wbMgr = writebuffer.NewMockBufferManager(t)
wbMgr.EXPECT().SealSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
handler = newFlushMsgHandler(wbMgr)
handler = newMsgHandler(wbMgr)
err = handler.HandleFlush(vchannel, im)
assert.NoError(t, err)
}
@ -78,7 +78,7 @@ func TestFlushMsgHandler_HandleManualFlush(t *testing.T) {
BuildMutable()
assert.NoError(t, err)
handler := newFlushMsgHandler(wbMgr)
handler := newMsgHandler(wbMgr)
msgID := mock_message.NewMockMessageID(t)
im, err := message.AsImmutableManualFlushMessageV2(msg.IntoImmutableMessage(msgID))
assert.NoError(t, err)
@ -89,7 +89,7 @@ func TestFlushMsgHandler_HandleManualFlush(t *testing.T) {
wbMgr = writebuffer.NewMockBufferManager(t)
wbMgr.EXPECT().FlushChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil)
handler = newFlushMsgHandler(wbMgr)
handler = newMsgHandler(wbMgr)
err = handler.HandleManualFlush(vchannel, im)
assert.NoError(t, err)
}

View File

@ -46,6 +46,6 @@ func getPipelineParams(chunkManager storage.ChunkManager) *util.PipelineParams {
WriteBufferManager: wbMgr,
CheckpointUpdater: cpUpdater,
Allocator: idalloc.NewMAllocator(rsc.IDAllocator()),
FlushMsgHandler: newFlushMsgHandler(wbMgr),
MsgHandler: newMsgHandler(wbMgr),
}
}

View File

@ -16,9 +16,15 @@
package flusher
import "github.com/milvus-io/milvus/pkg/streaming/util/message"
import (
"context"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
type MsgHandler interface {
HandleCreateSegment(ctx context.Context, vchannel string, createSegmentMsg message.ImmutableCreateSegmentMessageV2) error
type FlushMsgHandler interface {
HandleFlush(vchannel string, flushMsg message.ImmutableFlushMessageV2) error
HandleManualFlush(vchannel string, flushMsg message.ImmutableManualFlushMessageV2) error

View File

@ -4,6 +4,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/ddl"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/redo"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
@ -32,6 +33,7 @@ func (b builderAdaptorImpl) Build() (wal.Opener, error) {
}
// Add all interceptor here.
return adaptImplsToOpener(o, []interceptors.InterceptorBuilder{
redo.NewInterceptorBuilder(),
timetick.NewInterceptorBuilder(),
segment.NewInterceptorBuilder(),
ddl.NewInterceptorBuilder(),

View File

@ -4,6 +4,7 @@ import (
"context"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
@ -115,13 +116,20 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
metricsGuard.Finish(err)
return nil, err
}
var extra *anypb.Any
if extraAppendResult.Extra != nil {
var err error
if extra, err = anypb.New(extraAppendResult.Extra); err != nil {
panic("unreachable: failed to marshal extra append result")
}
}
// unwrap the messageID if needed.
r := &wal.AppendResult{
MessageID: messageID,
TimeTick: extraAppendResult.TimeTick,
TxnCtx: extraAppendResult.TxnCtx,
Extra: extraAppendResult.Extra,
Extra: extra,
}
metricsGuard.Finish(nil)
return r, nil

View File

@ -0,0 +1,16 @@
package redo
import "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
// NewInterceptorBuilder creates a new redo interceptor builder.
func NewInterceptorBuilder() interceptors.InterceptorBuilder {
return &interceptorBuilder{}
}
// interceptorBuilder is the builder for redo interceptor.
type interceptorBuilder struct{}
// Build creates a new redo interceptor.
func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) interceptors.Interceptor {
return &redoAppendInterceptor{}
}

View File

@ -0,0 +1,35 @@
package redo
import (
"context"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
var (
_ interceptors.Interceptor = (*redoAppendInterceptor)(nil)
ErrRedo = errors.New("redo")
)
// redoAppendInterceptor is an append interceptor to retry the append operation if needed.
// It's useful when the append operation want to refresh the append context (such as timetick belong to the message)
type redoAppendInterceptor struct{}
func (r *redoAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) {
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
msgID, err = append(ctx, msg)
// If the error is ErrRedo, we should redo the append operation.
if errors.Is(err, ErrRedo) {
continue
}
return msgID, err
}
}
func (r *redoAppendInterceptor) Close() {}

View File

@ -55,8 +55,8 @@ func (s *sealOperationInspectorImpl) TriggerSealWaited(ctx context.Context, pcha
}
}
// RegsiterPChannelManager implements SealInspector.RegsiterPChannelManager.
func (s *sealOperationInspectorImpl) RegsiterPChannelManager(m SealOperator) {
// RegisterPChannelManager implements SealInspector.RegisterPChannelManager.
func (s *sealOperationInspectorImpl) RegisterPChannelManager(m SealOperator) {
_, loaded := s.managers.GetOrInsert(m.Channel().Name, m)
if loaded {
panic("pchannel manager already exists, critical bug in code")

View File

@ -27,7 +27,7 @@ type SealOperationInspector interface {
TriggerSealWaited(ctx context.Context, pchannel string) error
// RegisterPChannelManager registers a pchannel manager.
RegsiterPChannelManager(m SealOperator)
RegisterPChannelManager(m SealOperator)
// UnregisterPChannelManager unregisters a pchannel manager.
UnregisterPChannelManager(m SealOperator)

View File

@ -39,7 +39,7 @@ func TestSealedInspector(t *testing.T) {
return ops.Load()%2 == 0
})
inspector.RegsiterPChannelManager(o)
inspector.RegisterPChannelManager(o)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {

View File

@ -10,19 +10,23 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/messagespb"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
var ErrFencedAssign = errors.New("fenced assign")
// newPartitionSegmentManager creates a new partition segment assign manager.
func newPartitionSegmentManager(
wal *syncutil.Future[wal.WAL],
pchannel types.PChannelInfo,
vchannel string,
collectionID int64,
@ -37,6 +41,7 @@ func newPartitionSegmentManager(
zap.String("vchannel", vchannel),
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", paritionID)),
wal: wal,
pchannel: pchannel,
vchannel: vchannel,
collectionID: collectionID,
@ -50,6 +55,7 @@ func newPartitionSegmentManager(
type partitionSegmentManager struct {
mu sync.Mutex
logger *log.MLogger
wal *syncutil.Future[wal.WAL]
pchannel types.PChannelInfo
vchannel string
collectionID int64
@ -72,17 +78,20 @@ func (m *partitionSegmentManager) AssignSegment(ctx context.Context, req *Assign
// So it's just a promise check here.
// If the request time tick is less than the fenced time tick, the assign operation is fenced.
// A special error will be returned to indicate the assign operation is fenced.
// The wal will retry it with new timetick.
if req.TimeTick <= m.fencedAssignTimeTick {
return nil, ErrFencedAssign
}
return m.assignSegment(ctx, req)
}
// SealAllSegmentsAndFenceUntil seals all segments and fence assign until the maximum of timetick or max time tick.
func (m *partitionSegmentManager) SealAllSegmentsAndFenceUntil(timeTick uint64) (sealedSegments []*segmentAllocManager) {
// SealAndFenceSegmentUntil seal all segment that contains the message less than the incoming timetick.
func (m *partitionSegmentManager) SealAndFenceSegmentUntil(timeTick uint64) (sealedSegments []*segmentAllocManager) {
m.mu.Lock()
defer m.mu.Unlock()
// no-op if the incoming time tick is less than the fenced time tick.
if timeTick <= m.fencedAssignTimeTick {
return
}
segmentManagers := m.collectShouldBeSealedWithPolicy(func(segmentMeta *segmentAllocManager) (policy.PolicyName, bool) { return policy.PolicyNameFenced, true })
// fence the assign operation until the incoming time tick or latest assigned timetick.
@ -225,6 +234,27 @@ func (m *partitionSegmentManager) allocNewGrowingSegment(ctx context.Context) (*
if err := merr.CheckRPCCall(resp, err); err != nil {
return nil, errors.Wrap(err, "failed to alloc growing segment at datacoord")
}
msg, err := message.NewCreateSegmentMessageBuilderV2().
WithVChannel(pendingSegment.GetVChannel()).
WithHeader(&message.CreateSegmentMessageHeader{}).
WithBody(&message.CreateSegmentMessageBody{
CollectionId: pendingSegment.GetCollectionID(),
Segments: []*messagespb.CreateSegmentInfo{{
// We only execute one segment creation operation at a time.
// But in future, we need to modify the segment creation operation to support batch creation.
// Because the partition-key based collection may create huge amount of segments at the same time.
PartitionId: pendingSegment.GetPartitionID(),
SegmentId: pendingSegment.GetSegmentID(),
}},
}).BuildMutable()
if err != nil {
return nil, errors.Wrapf(err, "failed to create new segment message, segmentID: %d", pendingSegment.GetSegmentID())
}
// Send CreateSegmentMessage into wal.
msgID, err := m.wal.Get().Append(ctx, msg)
if err != nil {
return nil, errors.Wrapf(err, "failed to send create segment message into wal, segmentID: %d", pendingSegment.GetSegmentID())
}
// Getnerate growing segment limitation.
limitation := policy.GetSegmentLimitationPolicy().GenerateLimitation()
@ -232,13 +262,14 @@ func (m *partitionSegmentManager) allocNewGrowingSegment(ctx context.Context) (*
// Commit it into streaming node meta.
// growing segment can be assigned now.
tx := pendingSegment.BeginModification()
tx.IntoGrowing(&limitation)
tx.IntoGrowing(&limitation, msgID.TimeTick)
if err := tx.Commit(ctx); err != nil {
return nil, errors.Wrapf(err, "failed to commit modification of segment assignment into growing, segmentID: %d", pendingSegment.GetSegmentID())
}
m.logger.Info(
"generate new growing segment",
m.logger.Info("generate new growing segment",
zap.Int64("segmentID", pendingSegment.GetSegmentID()),
zap.String("messageID", msgID.MessageID.String()),
zap.Uint64("timetick", msgID.TimeTick),
zap.String("limitationPolicy", limitation.PolicyName),
zap.Uint64("segmentBinarySize", limitation.SegmentSize),
zap.Any("extraInfo", limitation.ExtraInfo),
@ -280,12 +311,28 @@ func (m *partitionSegmentManager) createNewPendingSegment(ctx context.Context) (
// assignSegment assigns a segment for a assign segment request and return should trigger a seal operation.
func (m *partitionSegmentManager) assignSegment(ctx context.Context, req *AssignSegmentRequest) (*AssignSegmentResult, error) {
// Alloc segment for insert at previous segments.
hitTimeTickTooOld := false
// Alloc segment for insert at allocated segments.
for _, segment := range m.segments {
inserted, ack := segment.AllocRows(ctx, req)
if inserted {
return &AssignSegmentResult{SegmentID: segment.GetSegmentID(), Acknowledge: ack}, nil
result, err := segment.AllocRows(ctx, req)
if err == nil {
return result, nil
}
if errors.IsAny(err, ErrTooLargeInsert) {
// Return error directly.
// If the insert message is too large to hold by single segment, it can not be inserted anymore.
return nil, err
}
if errors.Is(err, ErrTimeTickTooOld) {
hitTimeTickTooOld = true
}
}
// If the timetick is too old for existing segment, it can not be inserted even allocate new growing segment,
// (new growing segment's timetick is always greater than the old gorwing segmet's timetick).
// Return directly to avoid unnecessary growing segment allocation.
if hitTimeTickTooOld {
return nil, ErrTimeTickTooOld
}
// If not inserted, ask a new growing segment to insert.
@ -293,8 +340,5 @@ func (m *partitionSegmentManager) assignSegment(ctx context.Context, req *Assign
if err != nil {
return nil, err
}
if inserted, ack := newGrowingSegment.AllocRows(ctx, req); inserted {
return &AssignSegmentResult{SegmentID: newGrowingSegment.GetSegmentID(), Acknowledge: ack}, nil
}
return nil, status.NewUnrecoverableError("too large insert message, cannot hold in empty growing segment, stats: %+v", req.InsertMetrics)
return newGrowingSegment.AllocRows(ctx, req)
}

View File

@ -7,17 +7,20 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// buildNewPartitionManagers builds new partition managers.
func buildNewPartitionManagers(
wal *syncutil.Future[wal.WAL],
pchannel types.PChannelInfo,
rawMetas []*streamingpb.SegmentAssignmentMeta,
collectionInfos []*rootcoordpb.CollectionInfoOnPChannel,
@ -62,6 +65,7 @@ func buildNewPartitionManagers(
}
// otherwise, just create a new manager.
_, ok := managers.GetOrInsert(partition.GetPartitionId(), newPartitionSegmentManager(
wal,
pchannel,
collectionInfo.GetVchannel(),
collectionID,
@ -77,6 +81,7 @@ func buildNewPartitionManagers(
m := &partitionSegmentManagers{
mu: sync.Mutex{},
logger: log.With(zap.Any("pchannel", pchannel)),
wal: wal,
pchannel: pchannel,
managers: managers,
collectionInfos: collectionInfoMap,
@ -91,6 +96,7 @@ type partitionSegmentManagers struct {
mu sync.Mutex
logger *log.MLogger
wal *syncutil.Future[wal.WAL]
pchannel types.PChannelInfo
managers *typeutil.ConcurrentMap[int64, *partitionSegmentManager] // map partitionID to partition manager
collectionInfos map[int64]*rootcoordpb.CollectionInfoOnPChannel // map collectionID to collectionInfo
@ -112,6 +118,7 @@ func (m *partitionSegmentManagers) NewCollection(collectionID int64, vchannel st
m.collectionInfos[collectionID] = newCollectionInfo(collectionID, vchannel, partitionID)
for _, partitionID := range partitionID {
if _, loaded := m.managers.GetOrInsert(partitionID, newPartitionSegmentManager(
m.wal,
m.pchannel,
vchannel,
collectionID,
@ -149,6 +156,7 @@ func (m *partitionSegmentManagers) NewPartition(collectionID int64, partitionID
})
if _, loaded := m.managers.GetOrInsert(partitionID, newPartitionSegmentManager(
m.wal,
m.pchannel,
m.collectionInfos[collectionID].Vchannel,
collectionID,
@ -255,8 +263,8 @@ func (m *partitionSegmentManagers) RemovePartition(collectionID int64, partition
return segments
}
// SealAllSegmentsAndFenceUntil seals all segments and fence assign until timetick.
func (m *partitionSegmentManagers) SealAllSegmentsAndFenceUntil(collectionID int64, timetick uint64) ([]*segmentAllocManager, error) {
// SealAndFenceSegmentUntil seal all segment that contains the message less than the incoming timetick.
func (m *partitionSegmentManagers) SealAndFenceSegmentUntil(collectionID int64, timetick uint64) ([]*segmentAllocManager, error) {
m.mu.Lock()
defer m.mu.Unlock()
@ -278,7 +286,7 @@ func (m *partitionSegmentManagers) SealAllSegmentsAndFenceUntil(collectionID int
zap.Int64("partitionID", partition.PartitionId))
return nil, errors.New("partition not found")
}
newSealedSegments := pm.SealAllSegmentsAndFenceUntil(timetick)
newSealedSegments := pm.SealAndFenceSegmentUntil(timetick)
for _, segment := range newSealedSegments {
segmentIDs = append(segmentIDs, segment.GetSegmentID())
}

View File

@ -39,7 +39,7 @@ func RecoverPChannelSegmentAllocManager(
return nil, errors.Wrap(err, "failed to get pchannel info from rootcoord")
}
metrics := metricsutil.NewSegmentAssignMetrics(pchannel.Name)
managers, waitForSealed := buildNewPartitionManagers(pchannel, rawMetas, resp.GetCollections(), metrics)
managers, waitForSealed := buildNewPartitionManagers(wal, pchannel, rawMetas, resp.GetCollections(), metrics)
// PChannelSegmentAllocManager is the segment assign manager of determined pchannel.
logger := log.With(zap.Any("pchannel", pchannel))
@ -143,8 +143,8 @@ func (m *PChannelSegmentAllocManager) RemovePartition(ctx context.Context, colle
return m.helper.WaitUntilNoWaitSeal(ctx)
}
// SealAllSegmentsAndFenceUntil seals all segments and fence assign until timetick and return the segmentIDs.
func (m *PChannelSegmentAllocManager) SealAllSegmentsAndFenceUntil(ctx context.Context, collectionID int64, timetick uint64) ([]int64, error) {
// SealAndFenceSegmentUntil seal all segment that contains the message less than the incoming timetick.
func (m *PChannelSegmentAllocManager) SealAndFenceSegmentUntil(ctx context.Context, collectionID int64, timetick uint64) ([]int64, error) {
if err := m.checkLifetime(); err != nil {
return nil, err
}
@ -152,7 +152,7 @@ func (m *PChannelSegmentAllocManager) SealAllSegmentsAndFenceUntil(ctx context.C
// All message's timetick less than incoming timetick is all belong to the output sealed segment.
// So the output sealed segment transfer into flush == all message's timetick less than incoming timetick are flushed.
sealedSegments, err := m.managers.SealAllSegmentsAndFenceUntil(collectionID, timetick)
sealedSegments, err := m.managers.SealAndFenceSegmentUntil(collectionID, timetick)
if err != nil {
return nil, err
}

View File

@ -22,6 +22,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/txn"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
@ -32,7 +33,10 @@ func TestSegmentAllocManager(t *testing.T) {
initializeTestState(t)
w := mock_wal.NewMockWAL(t)
w.EXPECT().Append(mock.Anything, mock.Anything).Return(nil, nil)
w.EXPECT().Append(mock.Anything, mock.Anything).Return(&wal.AppendResult{
MessageID: rmq.NewRmqID(1),
TimeTick: 2,
}, nil)
f := syncutil.NewFuture[wal.WAL]()
f.Set(w)
@ -42,8 +46,21 @@ func TestSegmentAllocManager(t *testing.T) {
ctx := context.Background()
// Ask for allocate segment
// Ask for a too old timetick.
result, err := m.AssignSegment(ctx, &AssignSegmentRequest{
CollectionID: 1,
PartitionID: 1,
InsertMetrics: stats.InsertMetrics{
Rows: 100,
BinarySize: 100,
},
TimeTick: 1,
})
assert.Nil(t, result)
assert.ErrorIs(t, err, ErrTimeTickTooOld)
// Ask for allocate segment
result, err = m.AssignSegment(ctx, &AssignSegmentRequest{
CollectionID: 1,
PartitionID: 1,
InsertMetrics: stats.InsertMetrics{
@ -163,7 +180,7 @@ func TestSegmentAllocManager(t *testing.T) {
ts := tsoutil.GetCurrentTime()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
ids, err := m.SealAllSegmentsAndFenceUntil(ctx, 1, ts)
ids, err := m.SealAndFenceSegmentUntil(ctx, 1, ts)
assert.Error(t, err)
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Empty(t, ids)
@ -190,14 +207,26 @@ func TestCreateAndDropCollection(t *testing.T) {
initializeTestState(t)
w := mock_wal.NewMockWAL(t)
w.EXPECT().Append(mock.Anything, mock.Anything).Return(nil, nil)
w.EXPECT().Append(mock.Anything, mock.Anything).Return(&wal.AppendResult{
MessageID: rmq.NewRmqID(1),
TimeTick: 1,
}, nil)
f := syncutil.NewFuture[wal.WAL]()
f.Set(w)
m, err := RecoverPChannelSegmentAllocManager(context.Background(), types.PChannelInfo{Name: "v1"}, f)
assert.NoError(t, err)
assert.NotNil(t, m)
inspector.GetSegmentSealedInspector().RegsiterPChannelManager(m)
m.MustSealSegments(context.Background(), stats.SegmentBelongs{
PChannel: "v1",
VChannel: "v1",
CollectionID: 1,
PartitionID: 2,
SegmentID: 4000,
})
inspector.GetSegmentSealedInspector().RegisterPChannelManager(m)
ctx := context.Background()

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -19,6 +20,13 @@ import (
const dirtyThreshold = 30 * 1024 * 1024 // 30MB
var (
ErrSegmentNotGrowing = errors.New("segment is not growing")
ErrTimeTickTooOld = errors.New("time tick is too old")
ErrNotEnoughSpace = stats.ErrNotEnoughSpace
ErrTooLargeInsert = stats.ErrTooLargeInsert
)
// newSegmentAllocManagerFromProto creates a new segment assignment meta from proto.
func newSegmentAllocManagerFromProto(
pchannel types.PChannelInfo,
@ -161,14 +169,20 @@ func (s *segmentAllocManager) TxnSem() int32 {
// AllocRows ask for rows from current segment.
// Only growing and not fenced segment can alloc rows.
func (s *segmentAllocManager) AllocRows(ctx context.Context, req *AssignSegmentRequest) (bool, *atomic.Int32) {
func (s *segmentAllocManager) AllocRows(ctx context.Context, req *AssignSegmentRequest) (*AssignSegmentResult, error) {
// if the segment is not growing or reach limit, return false directly.
if s.inner.GetState() != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
return false, nil
return nil, ErrSegmentNotGrowing
}
inserted := resource.Resource().SegmentAssignStatsManager().AllocRows(s.GetSegmentID(), req.InsertMetrics)
if !inserted {
return false, nil
if req.TimeTick <= s.inner.GetStat().CreateSegmentTimeTick {
// The incoming insert request's timetick is less than the segment's create time tick,
// return ErrTimeTickTooOld and reallocate new timetick.
return nil, ErrTimeTickTooOld
}
err := resource.Resource().SegmentAssignStatsManager().AllocRows(s.GetSegmentID(), req.InsertMetrics)
if err != nil {
return nil, err
}
s.dirtyBytes += req.InsertMetrics.BinarySize
s.ackSem.Inc()
@ -181,7 +195,10 @@ func (s *segmentAllocManager) AllocRows(ctx context.Context, req *AssignSegmentR
// persist stats if too dirty.
s.persistStatsIfTooDirty(ctx)
return inserted, s.ackSem
return &AssignSegmentResult{
SegmentID: s.GetSegmentID(),
Acknowledge: s.ackSem,
}, nil
}
// Snapshot returns the snapshot of the segment assignment meta.
@ -237,7 +254,7 @@ func (m *mutableSegmentAssignmentMeta) IntoPending() {
}
// IntoGrowing transfers the segment assignment meta into growing state.
func (m *mutableSegmentAssignmentMeta) IntoGrowing(limitation *policy.SegmentLimitation) {
func (m *mutableSegmentAssignmentMeta) IntoGrowing(limitation *policy.SegmentLimitation, createSegmentTimeTick uint64) {
if m.modifiedCopy.State != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_PENDING {
panic("tranfer state to growing from non-pending state")
}
@ -247,6 +264,7 @@ func (m *mutableSegmentAssignmentMeta) IntoGrowing(limitation *policy.SegmentLim
MaxBinarySize: limitation.SegmentSize,
CreateTimestampNanoseconds: now,
LastModifiedTimestampNanoseconds: now,
CreateSegmentTimeTick: createSegmentTimeTick,
}
}

View File

@ -4,10 +4,11 @@ import (
"context"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/redo"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/manager"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
@ -15,6 +16,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/messagespb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -134,7 +136,7 @@ func (impl *segmentInterceptor) handleInsertMessage(ctx context.Context, msg mes
return nil, err
}
// Assign segment for insert message.
// Current implementation a insert message only has one parition, but we need to merge the message for partition-key in future.
// !!! Current implementation a insert message only has one parition, but we need to merge the message for partition-key in future.
header := insertMsg.Header()
for _, partition := range header.GetPartitions() {
result, err := impl.assignManager.Get().AssignSegment(ctx, &manager.AssignSegmentRequest{
@ -147,6 +149,15 @@ func (impl *segmentInterceptor) handleInsertMessage(ctx context.Context, msg mes
TimeTick: msg.TimeTick(),
TxnSession: txn.GetTxnSessionFromContext(ctx),
})
if errors.Is(err, manager.ErrTimeTickTooOld) {
// If current time tick of insert message is too old to alloc segment,
// we just redo it to refresh a new latest timetick.
return nil, redo.ErrRedo
}
if errors.Is(err, manager.ErrTooLargeInsert) {
// Message is too large, so retry operation is unrecoverable, can't be retry at client side.
return nil, status.NewUnrecoverableError("insert too large, binary size: %d", msg.EstimateSize())
}
if err != nil {
return nil, err
}
@ -173,17 +184,24 @@ func (impl *segmentInterceptor) handleManualFlushMessage(ctx context.Context, ms
return nil, err
}
header := maunalFlushMsg.Header()
segmentIDs, err := impl.assignManager.Get().SealAllSegmentsAndFenceUntil(ctx, header.GetCollectionId(), header.GetFlushTs())
segmentIDs, err := impl.assignManager.Get().SealAndFenceSegmentUntil(ctx, header.GetCollectionId(), header.GetFlushTs())
if err != nil {
return nil, status.NewInner("segment seal failure with error: %s", err.Error())
}
// create extra response for manual flush message.
extraResponse, err := anypb.New(&message.ManualFlushExtraResponse{
SegmentIds: segmentIDs,
// Modify the extra response for manual flush message.
utility.ModifyAppendResultExtra(ctx, func(old *message.ManualFlushExtraResponse) *message.ManualFlushExtraResponse {
if old == nil {
return &messagespb.ManualFlushExtraResponse{SegmentIds: segmentIDs}
}
return &messagespb.ManualFlushExtraResponse{SegmentIds: append(old.GetSegmentIds(), segmentIDs...)}
})
if err != nil {
return nil, status.NewInner("create extra response failed with error: %s", err.Error())
if len(segmentIDs) > 0 {
// There's some new segment sealed, we need to retry the manual flush operation refresh the context.
// If we don't refresh the context, the sequence of message in wal will be:
// FlushTsHere -> ManualFlush -> FlushSegment1 -> FlushSegment2 -> FlushSegment3.
// After refresh the context, keep the sequence of the message in the wal with following seq:
// FlushTsHere -> FlushSegment1 -> FlushSegment2 -> FlushSegment3 -> ManualFlush.
return nil, redo.ErrRedo
}
// send the manual flush message.
@ -192,7 +210,6 @@ func (impl *segmentInterceptor) handleManualFlushMessage(ctx context.Context, ms
return nil, err
}
utility.AttachAppendResultExtra(ctx, extraResponse)
return msgID, nil
}
@ -234,7 +251,7 @@ func (impl *segmentInterceptor) recoverPChannelManager(param interceptors.Interc
}
// register the manager into inspector, to do the seal asynchronously
inspector.GetSegmentSealedInspector().RegsiterPChannelManager(pm)
inspector.GetSegmentSealedInspector().RegisterPChannelManager(pm)
impl.assignManager.Set(pm)
impl.logger.Info("recover PChannel Assignment Manager success")
return

View File

@ -60,7 +60,10 @@ type SyncOperationMetrics struct {
// Return true if the segment is assigned.
func (s *SegmentStats) AllocRows(m InsertMetrics) bool {
if m.BinarySize > s.BinaryCanBeAssign() {
s.ReachLimit = true
if s.Insert.BinarySize > 0 {
// if the binary size is not empty, it means the segment cannot hold more data, mark it as reach limit.
s.ReachLimit = true
}
return false
}
@ -74,6 +77,16 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 {
return s.MaxBinarySize - s.Insert.BinarySize
}
// ShouldBeSealed returns if the segment should be sealed.
func (s *SegmentStats) ShouldBeSealed() bool {
return s.ReachLimit
}
// IsEmpty returns if the segment is empty.
func (s *SegmentStats) IsEmpty() bool {
return s.Insert.Rows == 0
}
// UpdateOnSync updates the stats of segment on sync.
func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) {
s.BinLogCounter += f.BinLogCounterIncr

View File

@ -4,12 +4,18 @@ import (
"fmt"
"sync"
"github.com/cockroachdb/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var (
ErrNotEnoughSpace = errors.New("not enough space")
ErrTooLargeInsert = errors.New("insert too large")
)
// StatsManager is the manager of stats.
// It manages the insert stats of all segments, used to check if a segment has enough space to insert or should be sealed.
// If there will be a lock contention, we can optimize it by apply lock per segment.
@ -69,7 +75,7 @@ func (m *StatsManager) RegisterNewGrowingSegment(belongs SegmentBelongs, segment
}
// AllocRows alloc number of rows on current segment.
func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) bool {
func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) error {
m.mu.Lock()
defer m.mu.Unlock()
@ -78,7 +84,8 @@ func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) bool {
if !ok {
panic(fmt.Sprintf("alloc rows on a segment %d that not exist", segmentID))
}
inserted := m.segmentStats[segmentID].AllocRows(insert)
stat := m.segmentStats[segmentID]
inserted := stat.AllocRows(insert)
// update the total stats if inserted.
if inserted {
@ -91,12 +98,17 @@ func (m *StatsManager) AllocRows(segmentID int64, insert InsertMetrics) bool {
m.vchannelStats[info.VChannel] = &InsertMetrics{}
}
m.vchannelStats[info.VChannel].Collect(insert)
return true
return nil
}
// If not inserted, current segment can not hold the message, notify seal manager to do seal the segment.
m.sealNotifier.AddAndNotify(info)
return false
if stat.ShouldBeSealed() {
// notify seal manager to do seal the segment if stat reach the limit.
m.sealNotifier.AddAndNotify(info)
}
if stat.IsEmpty() {
return ErrTooLargeInsert
}
return ErrNotEnoughSpace
}
// SealNotifier returns the seal notifier.

View File

@ -34,23 +34,32 @@ func TestStatsManager(t *testing.T) {
assert.Len(t, m.vchannelStats, 3)
assert.Len(t, m.pchannelStats, 2)
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel2", VChannel: "vchannel3", CollectionID: 2, PartitionID: 5, SegmentID: 7}, 7, createSegmentStats(0, 0, 300))
assert.Len(t, m.segmentStats, 5)
assert.Len(t, m.segmentIndex, 5)
assert.Len(t, m.vchannelStats, 3)
assert.Len(t, m.pchannelStats, 2)
assert.Panics(t, func() {
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 3}, 3, createSegmentStats(100, 100, 300))
})
shouldBlock(t, m.SealNotifier().WaitChan())
m.AllocRows(3, InsertMetrics{Rows: 50, BinarySize: 50})
err := m.AllocRows(3, InsertMetrics{Rows: 50, BinarySize: 50})
assert.NoError(t, err)
stat := m.GetStatsOfSegment(3)
assert.Equal(t, uint64(150), stat.Insert.BinarySize)
shouldBlock(t, m.SealNotifier().WaitChan())
m.AllocRows(5, InsertMetrics{Rows: 250, BinarySize: 250})
err = m.AllocRows(5, InsertMetrics{Rows: 250, BinarySize: 250})
assert.ErrorIs(t, err, ErrNotEnoughSpace)
<-m.SealNotifier().WaitChan()
infos := m.SealNotifier().Get()
assert.Len(t, infos, 1)
m.AllocRows(6, InsertMetrics{Rows: 150, BinarySize: 150})
err = m.AllocRows(6, InsertMetrics{Rows: 150, BinarySize: 150})
assert.NoError(t, err)
shouldBlock(t, m.SealNotifier().WaitChan())
assert.Equal(t, uint64(250), m.vchannelStats["vchannel3"].BinarySize)
@ -67,17 +76,25 @@ func TestStatsManager(t *testing.T) {
m.UpdateOnSync(1000, SyncOperationMetrics{BinLogCounterIncr: 100})
shouldBlock(t, m.SealNotifier().WaitChan())
m.AllocRows(3, InsertMetrics{Rows: 400, BinarySize: 400})
m.AllocRows(5, InsertMetrics{Rows: 250, BinarySize: 250})
m.AllocRows(6, InsertMetrics{Rows: 400, BinarySize: 400})
err = m.AllocRows(3, InsertMetrics{Rows: 400, BinarySize: 400})
assert.ErrorIs(t, err, ErrNotEnoughSpace)
err = m.AllocRows(5, InsertMetrics{Rows: 250, BinarySize: 250})
assert.ErrorIs(t, err, ErrNotEnoughSpace)
err = m.AllocRows(6, InsertMetrics{Rows: 400, BinarySize: 400})
assert.ErrorIs(t, err, ErrNotEnoughSpace)
<-m.SealNotifier().WaitChan()
infos = m.SealNotifier().Get()
assert.Len(t, infos, 3)
err = m.AllocRows(7, InsertMetrics{Rows: 400, BinarySize: 400})
assert.ErrorIs(t, err, ErrTooLargeInsert)
shouldBlock(t, m.SealNotifier().WaitChan())
m.UnregisterSealedSegment(3)
m.UnregisterSealedSegment(4)
m.UnregisterSealedSegment(5)
m.UnregisterSealedSegment(6)
m.UnregisterSealedSegment(7)
assert.Empty(t, m.segmentStats)
assert.Empty(t, m.vchannelStats)
assert.Empty(t, m.pchannelStats)

View File

@ -59,6 +59,8 @@ func TestSegmentStats(t *testing.T) {
assert.Equal(t, stat.Insert.Rows, uint64(160))
assert.Equal(t, stat.Insert.BinarySize, uint64(320))
assert.True(t, time.Now().After(now))
assert.False(t, stat.IsEmpty())
assert.False(t, stat.ShouldBeSealed())
insert1 = InsertMetrics{
Rows: 100,
@ -68,6 +70,8 @@ func TestSegmentStats(t *testing.T) {
assert.False(t, inserted)
assert.Equal(t, stat.Insert.Rows, uint64(160))
assert.Equal(t, stat.Insert.BinarySize, uint64(320))
assert.False(t, stat.IsEmpty())
assert.True(t, stat.ShouldBeSealed())
stat.UpdateOnSync(SyncOperationMetrics{
BinLogCounterIncr: 4,
@ -76,3 +80,20 @@ func TestSegmentStats(t *testing.T) {
assert.Equal(t, uint64(7), stat.BinLogCounter)
assert.Equal(t, uint64(13), stat.BinLogFileCounter)
}
func TestOversizeAlloc(t *testing.T) {
now := time.Now()
stat := &SegmentStats{
Insert: InsertMetrics{},
MaxBinarySize: 400,
CreateTime: now,
LastModifiedTime: now,
}
// Try to alloc a oversized insert metrics.
inserted := stat.AllocRows(InsertMetrics{
BinarySize: 401,
})
assert.False(t, inserted)
assert.True(t, stat.IsEmpty())
assert.False(t, stat.ShouldBeSealed())
}

View File

@ -202,7 +202,7 @@ func (impl *timeTickAppendInterceptor) appendMsg(
return nil, err
}
utility.AttachAppendResultTimeTick(ctx, msg.TimeTick())
utility.AttachAppendResultTxnContext(ctx, msg.TxnContext())
utility.ReplaceAppendResultTimeTick(ctx, msg.TimeTick())
utility.ReplaceAppendResultTxnContext(ctx, msg.TxnContext())
return msgID, nil
}

View File

@ -2,8 +2,9 @@ package utility
import (
"context"
"reflect"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/reflect/protoreflect"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
@ -20,7 +21,7 @@ var (
type ExtraAppendResult struct {
TimeTick uint64
TxnCtx *message.TxnContext
Extra *anypb.Any
Extra protoreflect.ProtoMessage
}
// NotPersistedHint is the hint of not persisted message.
@ -47,20 +48,29 @@ func WithExtraAppendResult(ctx context.Context, r *ExtraAppendResult) context.Co
return context.WithValue(ctx, extraAppendResultValue, r)
}
// AttachAppendResultExtra set extra to context
func AttachAppendResultExtra(ctx context.Context, extra *anypb.Any) {
// ModifyAppendResultExtra modify extra in context
func ModifyAppendResultExtra[M protoreflect.ProtoMessage](ctx context.Context, modifier func(old M) (new M)) {
result := ctx.Value(extraAppendResultValue)
result.(*ExtraAppendResult).Extra = extra
var old M
if result.(*ExtraAppendResult).Extra != nil {
old = result.(*ExtraAppendResult).Extra.(M)
}
new := modifier(old)
if reflect.ValueOf(new).IsNil() {
result.(*ExtraAppendResult).Extra = nil
return
}
result.(*ExtraAppendResult).Extra = new
}
// AttachAppendResultTimeTick set time tick to context
func AttachAppendResultTimeTick(ctx context.Context, timeTick uint64) {
// ReplaceAppendResultTimeTick set time tick to context
func ReplaceAppendResultTimeTick(ctx context.Context, timeTick uint64) {
result := ctx.Value(extraAppendResultValue)
result.(*ExtraAppendResult).TimeTick = timeTick
}
// AttachAppendResultTxnContext set txn context to context
func AttachAppendResultTxnContext(ctx context.Context, txnCtx *message.TxnContext) {
// ReplaceAppendResultTxnContext set txn context to context
func ReplaceAppendResultTxnContext(ctx context.Context, txnCtx *message.TxnContext) {
result := ctx.Value(extraAppendResultValue)
result.(*ExtraAppendResult).TxnCtx = txnCtx
}

View File

@ -14,7 +14,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241106083218-5de5d0cfb1c1
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2
@ -26,8 +26,6 @@ require (
github.com/shirou/gopsutil/v3 v3.22.9
github.com/sirupsen/logrus v1.9.0
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/cast v1.3.1
github.com/spf13/viper v1.8.1
github.com/streamnative/pulsarctl v0.5.0
github.com/stretchr/testify v1.9.0
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c
@ -57,6 +55,7 @@ require (
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/apimachinery v0.28.6
)
@ -86,7 +85,6 @@ require (
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/getsentry/sentry-go v0.12.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
@ -99,7 +97,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
@ -107,19 +104,16 @@ require (
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/linkedin/goavro/v2 v2.11.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.8 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
@ -128,7 +122,6 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
@ -144,14 +137,10 @@ require (
github.com/prometheus/procfs v0.9.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/smartystreets/assertions v1.1.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
@ -180,9 +169,7 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

View File

@ -335,8 +335,6 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 h1:l5lAOZEym3oK3SQ2HBHWsJUfbNBiTXJDeW2QDxw9AQ0=
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
@ -374,7 +372,6 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
@ -417,7 +414,6 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJMErpKy6A4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
@ -439,7 +435,6 @@ github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
@ -493,8 +488,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd h1:x0b0+foTe23sKcVFseR1DE8+BB08EH6ViiRHaz8PEik=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241106083218-5de5d0cfb1c1 h1:GFS5AxKPcEstcfJgMGxRH+l/mKA0kK1sHDOxnOqMnoA=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241106083218-5de5d0cfb1c1/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
@ -507,8 +502,6 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -557,8 +550,6 @@ github.com/panjf2000/ants/v2 v2.7.2 h1:2NUt9BaZFO5kQzrieOmK/wdb/tQ/K+QHaxN8sOgD6
github.com/panjf2000/ants/v2 v2.7.2/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ=
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
@ -581,7 +572,6 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
@ -657,9 +647,6 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.1.0 h1:MkTeG1DMwsrdH7QtLXy5W+fUxWq+vmb6cLmyJ7aRtF0=
github.com/smartystreets/assertions v1.1.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
@ -668,25 +655,17 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY=
github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA=
github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -708,7 +687,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/thoas/go-funk v0.9.1/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
@ -853,7 +831,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -1002,7 +979,6 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1283,8 +1259,6 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.51.1/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=

View File

@ -34,6 +34,7 @@ enum MessageType {
CreatePartition = 7;
DropPartition = 8;
ManualFlush = 9;
CreateSegment = 10;
// begin transaction message is only used for transaction, once a begin
// transaction message is received, all messages combined with the
// transaction message cannot be consumed until a CommitTxn message
@ -80,6 +81,18 @@ message FlushMessageBody {
// ManualFlushMessageBody is the body of manual flush message.
message ManualFlushMessageBody {}
// CreateSegmentMessageBody is the body of create segment message.
message CreateSegmentMessageBody {
int64 collection_id = 1;
repeated CreateSegmentInfo segments = 2;
}
// CreateSegmentInfo is the info of create segment.
message CreateSegmentInfo {
int64 partition_id = 1;
int64 segment_id = 2;
}
// BeginTxnMessageBody is the body of begin transaction message.
// Just do nothing now.
message BeginTxnMessageBody {}
@ -138,6 +151,9 @@ message DeleteMessageHeader {
// FlushMessageHeader just nothing.
message FlushMessageHeader {}
// CreateSegmentMessageHeader just nothing.
message CreateSegmentMessageHeader {}
message ManualFlushMessageHeader {
int64 collection_id = 1;
uint64 flush_ts = 2;

View File

@ -432,4 +432,5 @@ message SegmentAssignmentStat {
int64 create_timestamp_nanoseconds = 4;
int64 last_modified_timestamp_nanoseconds = 5;
uint64 binlog_counter = 6;
uint64 create_segment_time_tick = 7; // The timetick of create segment message in wal.
}

View File

@ -139,6 +139,8 @@ func fromMessageToTsMsgV2(msg message.ImmutableMessage) (msgstream.TsMsg, error)
tsMsg, err = NewFlushMessageBody(msg)
case message.MessageTypeManualFlush:
tsMsg, err = NewManualFlushMessageBody(msg)
case message.MessageTypeCreateSegment:
tsMsg, err = NewCreateSegmentMessageBody(msg)
default:
panic("unsupported message type")
}

View File

@ -70,3 +70,21 @@ func TestNewMsgPackFromCreateCollectionMessage(t *testing.T) {
assert.Equal(t, tt, pack.BeginTs)
assert.Equal(t, tt, pack.EndTs)
}
func TestNewMsgPackFromCreateSegmentMessage(t *testing.T) {
id := rmq.NewRmqID(1)
tt := uint64(time.Now().UnixNano())
mutableMsg, err := message.NewCreateSegmentMessageBuilderV2().
WithHeader(&message.CreateSegmentMessageHeader{}).
WithBody(&message.CreateSegmentMessageBody{}).
WithVChannel("v1").
BuildMutable()
assert.NoError(t, err)
immutableCreateSegmentMsg := mutableMsg.WithTimeTick(tt).WithLastConfirmedUseMessageID().IntoImmutableMessage(id)
pack, err := NewMsgPackFromMessage(immutableCreateSegmentMsg)
assert.NoError(t, err)
assert.NotNil(t, pack)
assert.Equal(t, tt, pack.BeginTs)
assert.Equal(t, tt, pack.EndTs)
}

View File

@ -11,6 +11,7 @@ var messageTypeToCommonpbMsgType = map[message.MessageType]commonpb.MsgType{
message.MessageTypeDelete: commonpb.MsgType_Delete,
message.MessageTypeFlush: commonpb.MsgType_FlushSegment,
message.MessageTypeManualFlush: commonpb.MsgType_ManualFlush,
message.MessageTypeCreateSegment: commonpb.MsgType_CreateSegment,
message.MessageTypeCreateCollection: commonpb.MsgType_CreateCollection,
message.MessageTypeDropCollection: commonpb.MsgType_DropCollection,
message.MessageTypeCreatePartition: commonpb.MsgType_CreatePartition,

View File

@ -50,6 +50,30 @@ func (t *tsMsgImpl) SetTs(ts uint64) {
t.ts = ts
}
type CreateSegmentMessageBody struct {
*tsMsgImpl
CreateSegmentMessage message.ImmutableCreateSegmentMessageV2
}
func NewCreateSegmentMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error) {
createMsg, err := message.AsImmutableCreateSegmentMessageV2(msg)
if err != nil {
return nil, err
}
return &CreateSegmentMessageBody{
tsMsgImpl: &tsMsgImpl{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: msg.TimeTick(),
EndTimestamp: msg.TimeTick(),
},
ts: msg.TimeTick(),
sz: msg.EstimateSize(),
msgType: MustGetCommonpbMsgTypeFromMessageType(msg.MessageType()),
},
CreateSegmentMessage: createMsg,
}, nil
}
type FlushMessageBody struct {
*tsMsgImpl
FlushMessage message.ImmutableFlushMessageV2

View File

@ -44,6 +44,7 @@ var (
NewDropCollectionMessageBuilderV1 = createNewMessageBuilderV1[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]()
NewCreatePartitionMessageBuilderV1 = createNewMessageBuilderV1[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]()
NewDropPartitionMessageBuilderV1 = createNewMessageBuilderV1[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]()
NewCreateSegmentMessageBuilderV2 = createNewMessageBuilderV2[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]()
NewFlushMessageBuilderV2 = createNewMessageBuilderV2[*FlushMessageHeader, *FlushMessageBody]()
NewManualFlushMessageBuilderV2 = createNewMessageBuilderV2[*ManualFlushMessageHeader, *ManualFlushMessageBody]()
NewBeginTxnMessageBuilderV2 = createNewMessageBuilderV2[*BeginTxnMessageHeader, *BeginTxnMessageBody]()

View File

@ -65,33 +65,20 @@ func (m *messageImpl) WithWALTerm(term int64) MutableMessage {
// WithTimeTick sets the time tick of current message.
func (m *messageImpl) WithTimeTick(tt uint64) MutableMessage {
if m.properties.Exist(messageTimeTick) {
panic("time tick already set in properties of message")
}
m.properties.Set(messageTimeTick, EncodeUint64(tt))
return m
}
// WithLastConfirmed sets the last confirmed message id of current message.
func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage {
if m.properties.Exist(messageLastConfirmedIDSameWithMessageID) {
panic("last confirmed message already set in properties of message")
}
if m.properties.Exist(messageLastConfirmed) {
panic("last confirmed message already set in properties of message")
}
m.properties.Delete(messageLastConfirmedIDSameWithMessageID)
m.properties.Set(messageLastConfirmed, id.Marshal())
return m
}
// WithLastConfirmedUseMessageID sets the last confirmed message id of current message to be the same as message id.
func (m *messageImpl) WithLastConfirmedUseMessageID() MutableMessage {
if m.properties.Exist(messageLastConfirmedIDSameWithMessageID) {
panic("last confirmed message already set in properties of message")
}
if m.properties.Exist(messageLastConfirmed) {
panic("last confirmed message already set in properties of message")
}
m.properties.Delete(messageLastConfirmed)
m.properties.Set(messageLastConfirmedIDSameWithMessageID, "")
return m
}

View File

@ -13,6 +13,7 @@ const (
MessageTypeTimeTick MessageType = MessageType(messagespb.MessageType_TimeTick)
MessageTypeInsert MessageType = MessageType(messagespb.MessageType_Insert)
MessageTypeDelete MessageType = MessageType(messagespb.MessageType_Delete)
MessageTypeCreateSegment MessageType = MessageType(messagespb.MessageType_CreateSegment)
MessageTypeFlush MessageType = MessageType(messagespb.MessageType_Flush)
MessageTypeManualFlush MessageType = MessageType(messagespb.MessageType_ManualFlush)
MessageTypeCreateCollection MessageType = MessageType(messagespb.MessageType_CreateCollection)
@ -31,6 +32,7 @@ var messageTypeName = map[MessageType]string{
MessageTypeInsert: "INSERT",
MessageTypeDelete: "DELETE",
MessageTypeFlush: "FLUSH",
MessageTypeCreateSegment: "CREATE_SEGMENT",
MessageTypeManualFlush: "MANUAL_FLUSH",
MessageTypeCreateCollection: "CREATE_COLLECTION",
MessageTypeDropCollection: "DROP_COLLECTION",

View File

@ -22,6 +22,7 @@ type (
CreatePartitionMessageHeader = messagespb.CreatePartitionMessageHeader
DropPartitionMessageHeader = messagespb.DropPartitionMessageHeader
FlushMessageHeader = messagespb.FlushMessageHeader
CreateSegmentMessageHeader = messagespb.CreateSegmentMessageHeader
ManualFlushMessageHeader = messagespb.ManualFlushMessageHeader
BeginTxnMessageHeader = messagespb.BeginTxnMessageHeader
CommitTxnMessageHeader = messagespb.CommitTxnMessageHeader
@ -30,12 +31,13 @@ type (
)
type (
FlushMessageBody = messagespb.FlushMessageBody
ManualFlushMessageBody = messagespb.ManualFlushMessageBody
BeginTxnMessageBody = messagespb.BeginTxnMessageBody
CommitTxnMessageBody = messagespb.CommitTxnMessageBody
RollbackTxnMessageBody = messagespb.RollbackTxnMessageBody
TxnMessageBody = messagespb.TxnMessageBody
FlushMessageBody = messagespb.FlushMessageBody
CreateSegmentMessageBody = messagespb.CreateSegmentMessageBody
ManualFlushMessageBody = messagespb.ManualFlushMessageBody
BeginTxnMessageBody = messagespb.BeginTxnMessageBody
CommitTxnMessageBody = messagespb.CommitTxnMessageBody
RollbackTxnMessageBody = messagespb.RollbackTxnMessageBody
TxnMessageBody = messagespb.TxnMessageBody
)
type (
@ -51,6 +53,7 @@ var messageTypeMap = map[reflect.Type]MessageType{
reflect.TypeOf(&DropCollectionMessageHeader{}): MessageTypeDropCollection,
reflect.TypeOf(&CreatePartitionMessageHeader{}): MessageTypeCreatePartition,
reflect.TypeOf(&DropPartitionMessageHeader{}): MessageTypeDropPartition,
reflect.TypeOf(&CreateSegmentMessageHeader{}): MessageTypeCreateSegment,
reflect.TypeOf(&FlushMessageHeader{}): MessageTypeFlush,
reflect.TypeOf(&ManualFlushMessageHeader{}): MessageTypeManualFlush,
reflect.TypeOf(&BeginTxnMessageHeader{}): MessageTypeBeginTxn,
@ -77,6 +80,7 @@ type (
MutableDropCollectionMessageV1 = specializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
MutableCreatePartitionMessageV1 = specializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
MutableDropPartitionMessageV1 = specializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
MutableCreateSegmentMessageV2 = specializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]
MutableFlushMessageV2 = specializedMutableMessage[*FlushMessageHeader, *FlushMessageBody]
MutableBeginTxnMessageV2 = specializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody]
MutableCommitTxnMessageV2 = specializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody]
@ -89,6 +93,7 @@ type (
ImmutableDropCollectionMessageV1 = specializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
ImmutableCreatePartitionMessageV1 = specializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
ImmutableDropPartitionMessageV1 = specializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
ImmutableCreateSegmentMessageV2 = specializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]
ImmutableFlushMessageV2 = specializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody]
ImmutableManualFlushMessageV2 = specializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody]
ImmutableBeginTxnMessageV2 = specializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody]
@ -105,6 +110,7 @@ var (
AsMutableDropCollectionMessageV1 = asSpecializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
AsMutableCreatePartitionMessageV1 = asSpecializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
AsMutableDropPartitionMessageV1 = asSpecializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
AsMutableCreateSegmentMessageV2 = asSpecializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]
AsMutableFlushMessageV2 = asSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody]
AsMutableManualFlushMessageV2 = asSpecializedMutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody]
AsMutableBeginTxnMessageV2 = asSpecializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody]
@ -118,6 +124,7 @@ var (
AsImmutableDropCollectionMessageV1 = asSpecializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]
AsImmutableCreatePartitionMessageV1 = asSpecializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]
AsImmutableDropPartitionMessageV1 = asSpecializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]
AsImmutableCreateSegmentMessageV2 = asSpecializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]
AsImmutableFlushMessageV2 = asSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody]
AsImmutableManualFlushMessageV2 = asSpecializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody]
AsImmutableBeginTxnMessageV2 = asSpecializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody]