mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Fix the segment not found
error (#22772)
Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
6b5dfa6db2
commit
4a90490a67
@ -255,16 +255,15 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
|
||||
|
||||
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error {
|
||||
// Also prepare metric updates.
|
||||
oldSegments, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
|
||||
_, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log := log.With(zap.Int64("planID", plan.GetPlanID()))
|
||||
|
||||
log.Info("handleCompactionResult: altering metastore after compaction")
|
||||
if err := c.meta.alterMetaStoreAfterCompaction(modSegments, newSegment); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err))
|
||||
return fmt.Errorf("fail to alter metastore after compaction, err=%w", err)
|
||||
if err := c.meta.alterMetaStoreAfterCompaction(newSegment, modSegments); err != nil {
|
||||
log.Warn("fail to alert meta store", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var nodeID = c.plans[plan.GetPlanID()].dataNodeID
|
||||
@ -279,8 +278,8 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
|
||||
log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
|
||||
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to sync segments with node, reverting metastore",
|
||||
zap.Int64("nodeID", nodeID), zap.String("reason", err.Error()))
|
||||
return c.meta.revertAlterMetaStoreAfterCompaction(oldSegments, newSegment)
|
||||
zap.Int64("nodeID", nodeID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// Apply metrics after successful meta update.
|
||||
metricMutation.commit()
|
||||
|
@ -286,7 +286,7 @@ func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
||||
|
||||
func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
|
||||
mockDataNode := &mocks.DataNode{}
|
||||
mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
|
||||
call := mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
|
||||
|
||||
dataNodeID := UniqueID(111)
|
||||
|
||||
@ -412,6 +412,11 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
|
||||
has, err = c.meta.HasSegments([]UniqueID{1, 2, 3})
|
||||
require.NoError(t, err)
|
||||
require.True(t, has)
|
||||
|
||||
call.Unset()
|
||||
call = mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil)
|
||||
err = c.handleMergeCompactionResult(plan, compactionResult2)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
|
||||
|
@ -1066,8 +1066,14 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (m *meta) alterMetaStoreAfterCompaction(modSegments []*SegmentInfo, newSegment *SegmentInfo) error {
|
||||
modSegIDs := lo.Map(modSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error {
|
||||
modInfos := make([]*datapb.SegmentInfo, len(segmentsCompactFrom))
|
||||
for i := range segmentsCompactFrom {
|
||||
modInfos[i] = segmentsCompactFrom[i].SegmentInfo
|
||||
}
|
||||
newSegment := segmentCompactTo.SegmentInfo
|
||||
|
||||
modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
if newSegment.GetNumOfRows() == 0 {
|
||||
newSegment.State = commonpb.SegmentState_Dropped
|
||||
}
|
||||
@ -1080,52 +1086,28 @@ func (m *meta) alterMetaStoreAfterCompaction(modSegments []*SegmentInfo, newSegm
|
||||
zap.Int("delta logs", len(newSegment.GetDeltalogs())),
|
||||
zap.Int64("compact to segment", newSegment.GetID()))
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
modInfos := lo.Map(modSegments, func(item *SegmentInfo, _ int) *datapb.SegmentInfo {
|
||||
return item.SegmentInfo
|
||||
})
|
||||
|
||||
if err := m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modInfos, newSegment.SegmentInfo); err != nil {
|
||||
err := m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modInfos, newSegment)
|
||||
if err != nil {
|
||||
log.Warn("fail to alter segments and new segment", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, s := range modSegments {
|
||||
m.segments.SetSegment(s.GetID(), s)
|
||||
var compactFromIDs []int64
|
||||
for _, v := range segmentsCompactFrom {
|
||||
compactFromIDs = append(compactFromIDs, v.GetID())
|
||||
}
|
||||
|
||||
m.segments.SetSegment(newSegment.GetID(), newSegment)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *meta) revertAlterMetaStoreAfterCompaction(oldSegments []*SegmentInfo, removalSegment *SegmentInfo) error {
|
||||
log.Info("meta update: revert metastore after compaction failure",
|
||||
zap.Int64("collectionID", removalSegment.CollectionID),
|
||||
zap.Int64("partitionID", removalSegment.PartitionID),
|
||||
zap.Int64("compactedTo (segment to remove)", removalSegment.ID),
|
||||
zap.Int64s("compactedFrom (segments to add back)", removalSegment.GetCompactionFrom()),
|
||||
)
|
||||
|
||||
log.Info("meta update: alter in memory meta after compaction",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
oldSegmentInfos := lo.Map(oldSegments, func(item *SegmentInfo, _ int) *datapb.SegmentInfo {
|
||||
return item.SegmentInfo
|
||||
})
|
||||
|
||||
if err := m.catalog.RevertAlterSegmentsAndAddNewSegment(m.ctx, oldSegmentInfos, removalSegment.SegmentInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, s := range oldSegments {
|
||||
for _, s := range segmentsCompactFrom {
|
||||
m.segments.SetSegment(s.GetID(), s)
|
||||
}
|
||||
|
||||
if removalSegment.GetNumOfRows() > 0 {
|
||||
m.segments.DropSegment(removalSegment.GetID())
|
||||
}
|
||||
m.segments.SetSegment(segmentCompactTo.GetID(), segmentCompactTo)
|
||||
log.Info("meta update: alter in memory meta after compaction - complete",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -22,11 +22,6 @@ import (
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
@ -35,6 +30,10 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMetaReloadFromKV(t *testing.T) {
|
||||
@ -556,14 +555,9 @@ func TestMeta_alterMetaStore(t *testing.T) {
|
||||
}},
|
||||
}
|
||||
|
||||
toAlterInfo := lo.Map(toAlter, func(item *datapb.SegmentInfo, _ int) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: item}
|
||||
})
|
||||
|
||||
err := m.alterMetaStoreAfterCompaction(toAlterInfo, &SegmentInfo{SegmentInfo: newSeg})
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = m.revertAlterMetaStoreAfterCompaction(toAlterInfo, &SegmentInfo{SegmentInfo: newSeg})
|
||||
err := m.alterMetaStoreAfterCompaction(&SegmentInfo{SegmentInfo: newSeg}, lo.Map(toAlter, func(t *datapb.SegmentInfo, _ int) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: t}
|
||||
}))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -23,9 +23,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
@ -36,6 +34,8 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -64,7 +64,7 @@ type Channel interface {
|
||||
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
|
||||
transferNewSegments(segmentIDs []UniqueID)
|
||||
updateSegmentPKRange(segID UniqueID, ids storage.FieldData)
|
||||
mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
|
||||
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
|
||||
hasSegment(segID UniqueID, countFlushed bool) bool
|
||||
removeSegments(segID ...UniqueID)
|
||||
listCompactedSegmentIDs() map[UniqueID][]UniqueID
|
||||
@ -554,8 +554,8 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem
|
||||
return c.collSchema, nil
|
||||
}
|
||||
|
||||
func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
|
||||
log := log.With(
|
||||
func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("segment ID", seg.segmentID),
|
||||
zap.Int64("collection ID", seg.collectionID),
|
||||
zap.Int64("partition ID", seg.partitionID),
|
||||
@ -567,21 +567,31 @@ func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compac
|
||||
log.Warn("failed to mergeFlushedSegments, collection mismatch",
|
||||
zap.Int64("current collection ID", seg.collectionID),
|
||||
zap.Int64("expected collection ID", c.collectionID))
|
||||
return fmt.Errorf("failed to mergeFlushedSegments, mismatch collection, ID=%d", seg.collectionID)
|
||||
return errors.Newf("failed to mergeFlushedSegments, mismatch collection, ID=%d", seg.collectionID)
|
||||
}
|
||||
|
||||
compactedFrom = lo.Filter(compactedFrom, func(segID int64, _ int) bool {
|
||||
// which means the segment is the `flushed` state
|
||||
has := c.hasSegment(segID, true) && !c.hasSegment(segID, false)
|
||||
if !has {
|
||||
log.Warn("invalid segment", zap.Int64("segment_id", segID))
|
||||
var inValidSegments []UniqueID
|
||||
for _, ID := range compactedFrom {
|
||||
// no such segments in channel or the segments are unflushed.
|
||||
if !c.hasSegment(ID, true) || c.hasSegment(ID, false) {
|
||||
inValidSegments = append(inValidSegments, ID)
|
||||
}
|
||||
return has
|
||||
})
|
||||
}
|
||||
|
||||
if len(inValidSegments) > 0 {
|
||||
log.Warn("no match flushed segments to merge from", zap.Int64s("invalid segmentIDs", inValidSegments))
|
||||
compactedFrom = lo.Without(compactedFrom, inValidSegments...)
|
||||
}
|
||||
|
||||
log.Info("merge flushed segments")
|
||||
c.segMu.Lock()
|
||||
defer c.segMu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("the context has been closed", zap.Error(ctx.Err()))
|
||||
return errors.New("invalid context")
|
||||
default:
|
||||
}
|
||||
for _, ID := range compactedFrom {
|
||||
// the existent of the segments are already checked
|
||||
s := c.segments[ID]
|
||||
|
@ -671,7 +671,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
|
||||
require.False(t, channel.hasSegment(3, true))
|
||||
|
||||
// tests start
|
||||
err := channel.mergeFlushedSegments(test.inSeg, 100, test.inCompactedFrom)
|
||||
err := channel.mergeFlushedSegments(context.Background(), test.inSeg, 100, test.inCompactedFrom)
|
||||
if test.isValid {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
|
@ -31,10 +31,11 @@ const (
|
||||
)
|
||||
|
||||
type compactionExecutor struct {
|
||||
executing sync.Map // planID to compactor
|
||||
completed sync.Map // planID to CompactionResult
|
||||
taskCh chan compactor
|
||||
dropped sync.Map // vchannel dropped
|
||||
executing sync.Map // planID to compactor
|
||||
completedCompactor sync.Map // planID to compactor
|
||||
completed sync.Map // planID to CompactionResult
|
||||
taskCh chan compactor
|
||||
dropped sync.Map // vchannel dropped
|
||||
}
|
||||
|
||||
func newCompactionExecutor() *compactionExecutor {
|
||||
@ -57,6 +58,13 @@ func (c *compactionExecutor) toCompleteState(task compactor) {
|
||||
task.complete()
|
||||
c.executing.Delete(task.getPlanID())
|
||||
}
|
||||
func (c *compactionExecutor) injectDone(planID UniqueID) {
|
||||
c.completed.Delete(planID)
|
||||
task, loaded := c.completedCompactor.LoadAndDelete(planID)
|
||||
if loaded {
|
||||
task.(compactor).injectDone()
|
||||
}
|
||||
}
|
||||
|
||||
// These two func are bounded for waitGroup
|
||||
func (c *compactionExecutor) executeWithState(task compactor) {
|
||||
@ -89,6 +97,7 @@ func (c *compactionExecutor) executeTask(task compactor) {
|
||||
)
|
||||
} else {
|
||||
c.completed.Store(task.getPlanID(), result)
|
||||
c.completedCompactor.Store(task.getPlanID(), task)
|
||||
}
|
||||
|
||||
log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID()))
|
||||
@ -119,7 +128,7 @@ func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string
|
||||
// remove all completed plans for vChannelName
|
||||
c.completed.Range(func(key interface{}, value interface{}) bool {
|
||||
if value.(*datapb.CompactionResult).GetChannel() == vChannelName {
|
||||
c.completed.Delete(key.(UniqueID))
|
||||
c.injectDone(key.(UniqueID))
|
||||
log.Info("remove compaction results for dropped channel",
|
||||
zap.String("channel", vChannelName),
|
||||
zap.Int64("planID", key.(UniqueID)))
|
||||
|
@ -142,6 +142,10 @@ func (mc *mockCompactor) complete() {
|
||||
mc.done <- struct{}{}
|
||||
}
|
||||
|
||||
func (mc *mockCompactor) injectDone() {
|
||||
|
||||
}
|
||||
|
||||
func (mc *mockCompactor) compact() (*datapb.CompactionResult, error) {
|
||||
if !mc.isvalid {
|
||||
return nil, errStart
|
||||
|
@ -59,6 +59,7 @@ type iterator = storage.Iterator
|
||||
type compactor interface {
|
||||
complete()
|
||||
compact() (*datapb.CompactionResult, error)
|
||||
injectDone()
|
||||
stop()
|
||||
getPlanID() UniqueID
|
||||
getCollection() UniqueID
|
||||
@ -84,6 +85,7 @@ type compactionTask struct {
|
||||
done chan struct{}
|
||||
tr *timerecord.TimeRecorder
|
||||
chunkManager storage.ChunkManager
|
||||
inject *taskInjection
|
||||
}
|
||||
|
||||
// check if compactionTask implements compactor
|
||||
@ -123,6 +125,7 @@ func (t *compactionTask) complete() {
|
||||
func (t *compactionTask) stop() {
|
||||
t.cancel()
|
||||
<-t.done
|
||||
t.injectDone()
|
||||
}
|
||||
|
||||
func (t *compactionTask) getPlanID() UniqueID {
|
||||
@ -555,7 +558,12 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
statsLog.LogPath = blobPath
|
||||
}
|
||||
})
|
||||
defer close(ti.injectOver)
|
||||
defer func() {
|
||||
// the injection will be closed if fail to compact
|
||||
if t.inject == nil {
|
||||
close(ti.injectOver)
|
||||
}
|
||||
}()
|
||||
|
||||
t.injectFlush(ti, segIDs...)
|
||||
<-ti.Injected()
|
||||
@ -668,12 +676,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
Channel: t.plan.GetChannel(),
|
||||
}
|
||||
|
||||
uninjectStart := time.Now()
|
||||
ti.injectDone(true)
|
||||
uninjectEnd := time.Now()
|
||||
defer func() {
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
}()
|
||||
t.inject = ti
|
||||
|
||||
log.Info("compaction done",
|
||||
zap.Int64("planID", t.plan.GetPlanID()),
|
||||
@ -690,6 +693,15 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
return pack, nil
|
||||
}
|
||||
|
||||
func (t *compactionTask) injectDone() {
|
||||
if t.inject != nil {
|
||||
uninjectStart := time.Now()
|
||||
t.inject.injectDone(true)
|
||||
uninjectEnd := time.Now()
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO copy maybe expensive, but this seems to be the only convinent way.
|
||||
func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}, numRows int64) (storage.FieldData, error) {
|
||||
var rst storage.FieldData
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -760,6 +761,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
||||
assert.Equal(t, int64(4), result.GetNumOfRows())
|
||||
assert.NotEmpty(t, result.InsertLogs)
|
||||
assert.NotEmpty(t, result.Field2StatslogPaths)
|
||||
|
||||
assert.Equal(t, 0, mockfm.injectCount())
|
||||
task.injectDone()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, mockfm.injectCount())
|
||||
}
|
||||
})
|
||||
|
||||
@ -843,6 +849,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
||||
assert.Equal(t, int64(2), result.GetNumOfRows())
|
||||
assert.NotEmpty(t, result.InsertLogs)
|
||||
assert.NotEmpty(t, result.Field2StatslogPaths)
|
||||
|
||||
assert.Equal(t, 0, mockfm.injectCount())
|
||||
task.injectDone()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, mockfm.injectCount())
|
||||
})
|
||||
}
|
||||
|
||||
@ -851,6 +862,10 @@ type mockFlushManager struct {
|
||||
returnError bool
|
||||
recordFlushedSeg bool
|
||||
flushedSegIDs []UniqueID
|
||||
injectOverCount struct {
|
||||
sync.RWMutex
|
||||
value int
|
||||
}
|
||||
}
|
||||
|
||||
var _ flushManager = (*mockFlushManager)(nil)
|
||||
@ -878,9 +893,18 @@ func (mfm *mockFlushManager) injectFlush(injection *taskInjection, segments ...U
|
||||
//injection.injected <- struct{}{}
|
||||
close(injection.injected)
|
||||
<-injection.injectOver
|
||||
mfm.injectOverCount.Lock()
|
||||
defer mfm.injectOverCount.Unlock()
|
||||
mfm.injectOverCount.value++
|
||||
}()
|
||||
}
|
||||
|
||||
func (mfm *mockFlushManager) injectCount() int {
|
||||
mfm.injectOverCount.RLock()
|
||||
defer mfm.injectOverCount.RUnlock()
|
||||
return mfm.injectOverCount.value
|
||||
}
|
||||
|
||||
func (mfm *mockFlushManager) notifyAllFlushed() {}
|
||||
|
||||
func (mfm *mockFlushManager) startDropping() {}
|
||||
|
@ -20,6 +20,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
@ -96,6 +99,8 @@ func (mt *mergedTimeTickerSender) isClosed() bool {
|
||||
|
||||
func (mt *mergedTimeTickerSender) work() {
|
||||
defer mt.wg.Done()
|
||||
var sids []int64
|
||||
var isDiffTs bool
|
||||
lastTs := uint64(0)
|
||||
for {
|
||||
mt.cond.L.Lock()
|
||||
@ -107,26 +112,28 @@ func (mt *mergedTimeTickerSender) work() {
|
||||
mt.cond.L.Unlock()
|
||||
|
||||
mt.mu.Lock()
|
||||
if mt.ts != lastTs {
|
||||
var sids []int64
|
||||
isDiffTs = mt.ts != lastTs
|
||||
if isDiffTs {
|
||||
for sid := range mt.segmentIDs {
|
||||
sids = append(sids, sid)
|
||||
}
|
||||
|
||||
// we will reset the timer but not the segmentIDs, since if we sent the timetick fail we may block forever due to flush stuck
|
||||
lastTs = mt.ts
|
||||
mt.lastSent = time.Now()
|
||||
|
||||
if err := mt.send(mt.ts, sids); err != nil {
|
||||
log.Error("send hard time tick failed", zap.Error(err))
|
||||
mt.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
mt.segmentIDs = make(map[int64]struct{})
|
||||
|
||||
}
|
||||
mt.mu.Unlock()
|
||||
|
||||
if isDiffTs {
|
||||
if err := mt.send(lastTs, sids); err != nil {
|
||||
log.Error("send hard time tick failed", zap.Error(err))
|
||||
mt.mu.Lock()
|
||||
maps.Copy(mt.segmentIDs, lo.SliceToMap(sids, func(t int64) (int64, struct{}) {
|
||||
return t, struct{}{}
|
||||
}))
|
||||
mt.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -359,7 +359,6 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
|
||||
PlanID: k.(UniqueID),
|
||||
Result: v.(*datapb.CompactionResult),
|
||||
})
|
||||
node.compactionExecutor.completed.Delete(k)
|
||||
return true
|
||||
})
|
||||
|
||||
@ -392,27 +391,31 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
||||
return status, nil
|
||||
}
|
||||
|
||||
getChannel := func() (int64, Channel) {
|
||||
for _, segmentFrom := range req.GetCompactedFrom() {
|
||||
channel, err := node.flowgraphManager.getChannel(segmentFrom)
|
||||
if err != nil {
|
||||
log.Warn("invalid segmentID", zap.Int64("segment_from", segmentFrom), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
return segmentFrom, channel
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
oneSegment, channel := getChannel()
|
||||
if channel == nil {
|
||||
log.Warn("no available channel")
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
return status, nil
|
||||
}
|
||||
var (
|
||||
oneSegment int64
|
||||
channel Channel
|
||||
err error
|
||||
ds *dataSyncService
|
||||
ok bool
|
||||
)
|
||||
|
||||
ds, ok := node.flowgraphManager.getFlowgraphService(channel.getChannelName(oneSegment))
|
||||
if !ok {
|
||||
status.Reason = fmt.Sprintf("failed to find flow graph service, segmentID: %d", oneSegment)
|
||||
for _, fromSegment := range req.GetCompactedFrom() {
|
||||
channel, err = node.flowgraphManager.getChannel(fromSegment)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("fail to get the channel", zap.Int64("segment", fromSegment), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
ds, ok = node.flowgraphManager.getFlowgraphService(channel.getChannelName(fromSegment))
|
||||
if !ok {
|
||||
log.Ctx(ctx).Warn("fail to find flow graph service", zap.Int64("segment", fromSegment))
|
||||
continue
|
||||
}
|
||||
oneSegment = fromSegment
|
||||
break
|
||||
}
|
||||
if oneSegment == 0 {
|
||||
log.Ctx(ctx).Warn("no valid segment, maybe the request is a retry")
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
return status, nil
|
||||
}
|
||||
|
||||
@ -425,7 +428,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
||||
numRows: req.GetNumOfRows(),
|
||||
}
|
||||
|
||||
err := channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
|
||||
err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
|
||||
if err != nil {
|
||||
status.Reason = fmt.Sprintf("init pk stats fail, err=%s", err.Error())
|
||||
return status, nil
|
||||
@ -434,11 +437,11 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
||||
// block all flow graph so it's safe to remove segment
|
||||
ds.fg.Blockall()
|
||||
defer ds.fg.Unblock()
|
||||
if err := channel.mergeFlushedSegments(targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
|
||||
if err := channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
|
||||
status.Reason = err.Error()
|
||||
return status, nil
|
||||
}
|
||||
|
||||
node.compactionExecutor.injectDone(req.GetPlanID())
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
return status, nil
|
||||
}
|
||||
|
@ -164,7 +164,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
|
||||
mu.Unlock()
|
||||
return true
|
||||
})
|
||||
s.Assert().Equal(0, cnt)
|
||||
s.Assert().Equal(1, cnt)
|
||||
})
|
||||
|
||||
s.Run("unhealthy", func() {
|
||||
@ -652,13 +652,23 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
|
||||
CompactedTo: 102,
|
||||
NumOfRows: 100,
|
||||
}
|
||||
status, err := s.node.SyncSegments(s.ctx, req)
|
||||
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
status, err := s.node.SyncSegments(cancelCtx, req)
|
||||
s.Assert().NoError(err)
|
||||
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
|
||||
status, err = s.node.SyncSegments(s.ctx, req)
|
||||
s.Assert().NoError(err)
|
||||
s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||
|
||||
s.Assert().True(fg.channel.hasSegment(req.CompactedTo, true))
|
||||
s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[0], true))
|
||||
s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[1], true))
|
||||
|
||||
status, err = s.node.SyncSegments(s.ctx, req)
|
||||
s.Assert().NoError(err)
|
||||
s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||
})
|
||||
|
||||
s.Run("valid request numRows=0", func() {
|
||||
|
@ -109,7 +109,6 @@ type DataCoordCatalog interface {
|
||||
AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error
|
||||
SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error
|
||||
DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error
|
||||
RevertAlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error
|
||||
|
||||
MarkChannelAdded(ctx context.Context, channel string) error
|
||||
MarkChannelDeleted(ctx context.Context, channel string) error
|
||||
|
Loading…
Reference in New Issue
Block a user