mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
fix: get compaction failure when datanode is actually alive (#31353)
didn't mark the compact as failure if it's simply an rpc error when GetCompactionPlansResults see #31352 Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
d7727dd087
commit
8c43c5b6cb
@ -145,7 +145,11 @@ func (c *compactionPlanHandler) checkResult() {
|
||||
log.Warn("fail to check result", zap.Error(err))
|
||||
return
|
||||
}
|
||||
_ = c.updateCompaction(ts)
|
||||
err = c.updateCompaction(ts)
|
||||
if err != nil {
|
||||
log.Warn("fail to update compaction", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
|
||||
@ -522,7 +526,12 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
|
||||
// for DC might add new task while GetCompactionState.
|
||||
executingTasks := c.getTasksByState(executing)
|
||||
timeoutTasks := c.getTasksByState(timeout)
|
||||
planStates := c.sessions.GetCompactionPlansResults()
|
||||
planStates, err := c.sessions.GetCompactionPlansResults()
|
||||
if err != nil {
|
||||
// if there is a data node alive but we failed to get info,
|
||||
log.Warn("failed to get compaction plans from all nodes", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
cachedPlans := []int64{}
|
||||
|
||||
// TODO reduce the lock range
|
||||
|
@ -82,7 +82,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
|
||||
2: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}},
|
||||
3: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}},
|
||||
4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}},
|
||||
})
|
||||
}, nil)
|
||||
|
||||
s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once()
|
||||
{
|
||||
@ -644,7 +644,7 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
|
||||
2: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}},
|
||||
3: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}},
|
||||
5: {A: 222, B: &datapb.CompactionPlanResult{PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 5}}}},
|
||||
})
|
||||
}, nil)
|
||||
|
||||
inPlans := map[int64]*compactionTask{
|
||||
1: {
|
||||
|
@ -443,10 +443,14 @@ func (_c *MockSessionManager_FlushChannels_Call) RunAndReturn(run func(context.C
|
||||
}
|
||||
|
||||
// GetCompactionPlansResults provides a mock function with given fields:
|
||||
func (_m *MockSessionManager) GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult] {
|
||||
func (_m *MockSessionManager) GetCompactionPlansResults() (map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult], error) {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func() (map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult], error)); ok {
|
||||
return rf()
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
@ -455,7 +459,13 @@ func (_m *MockSessionManager) GetCompactionPlansResults() map[int64]*typeutil.Pa
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
if rf, ok := ret.Get(1).(func() error); ok {
|
||||
r1 = rf()
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockSessionManager_GetCompactionPlansResults_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCompactionPlansResults'
|
||||
@ -475,12 +485,12 @@ func (_c *MockSessionManager_GetCompactionPlansResults_Call) Run(run func()) *Mo
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSessionManager_GetCompactionPlansResults_Call) Return(_a0 map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]) *MockSessionManager_GetCompactionPlansResults_Call {
|
||||
_c.Call.Return(_a0)
|
||||
func (_c *MockSessionManager_GetCompactionPlansResults_Call) Return(_a0 map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult], _a1 error) *MockSessionManager_GetCompactionPlansResults_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSessionManager_GetCompactionPlansResults_Call) RunAndReturn(run func() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]) *MockSessionManager_GetCompactionPlansResults_Call {
|
||||
func (_c *MockSessionManager_GetCompactionPlansResults_Call) RunAndReturn(run func() (map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult], error)) *MockSessionManager_GetCompactionPlansResults_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ type SessionManager interface {
|
||||
Compaction(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error
|
||||
SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error
|
||||
Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest)
|
||||
GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult]
|
||||
GetCompactionPlansResults() (map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult], error)
|
||||
NotifyChannelOperation(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error
|
||||
CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error)
|
||||
AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error)
|
||||
@ -271,20 +271,19 @@ func (c *SessionManagerImpl) execImport(ctx context.Context, nodeID int64, itr *
|
||||
}
|
||||
|
||||
// GetCompactionPlanResults returns map[planID]*pair[nodeID, *CompactionPlanResults]
|
||||
func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult] {
|
||||
wg := sync.WaitGroup{}
|
||||
func (c *SessionManagerImpl) GetCompactionPlansResults() (map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult], error) {
|
||||
ctx := context.Background()
|
||||
errorGroup, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
plans := typeutil.NewConcurrentMap[int64, *typeutil.Pair[int64, *datapb.CompactionPlanResult]]()
|
||||
c.sessions.RLock()
|
||||
for nodeID, s := range c.sessions.data {
|
||||
wg.Add(1)
|
||||
go func(nodeID int64, s *Session) {
|
||||
defer wg.Done()
|
||||
nodeID, s := nodeID, s // https://golang.org/doc/faq#closures_and_goroutines
|
||||
errorGroup.Go(func() error {
|
||||
cli, err := s.GetOrCreateClient(ctx)
|
||||
if err != nil {
|
||||
log.Info("Cannot Create Client", zap.Int64("NodeID", nodeID))
|
||||
return
|
||||
return err
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
|
||||
defer cancel()
|
||||
@ -295,22 +294,25 @@ func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*typeutil.Pai
|
||||
),
|
||||
})
|
||||
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Info("Get State failed", zap.Error(err))
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
for _, rst := range resp.GetResults() {
|
||||
// for compatibility issue, before 2.3.4, resp has only logpath
|
||||
// try to parse path and fill logid
|
||||
binlog.CompressCompactionBinlogs(rst.GetSegments())
|
||||
nodeRst := typeutil.NewPair(nodeID, rst)
|
||||
plans.Insert(rst.PlanID, &nodeRst)
|
||||
}
|
||||
}(nodeID, s)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
c.sessions.RUnlock()
|
||||
wg.Wait()
|
||||
|
||||
// wait for all request done
|
||||
if err := errorGroup.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rst := make(map[int64]*typeutil.Pair[int64, *datapb.CompactionPlanResult])
|
||||
plans.Range(func(planID int64, result *typeutil.Pair[int64, *datapb.CompactionPlanResult]) bool {
|
||||
@ -318,7 +320,7 @@ func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*typeutil.Pai
|
||||
return true
|
||||
})
|
||||
|
||||
return rst
|
||||
return rst, nil
|
||||
}
|
||||
|
||||
func (c *SessionManagerImpl) FlushChannels(ctx context.Context, nodeID int64, req *datapb.FlushChannelsRequest) error {
|
||||
|
@ -2816,7 +2816,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
||||
p.CompactionCheckIntervalInSeconds = ParamItem{
|
||||
Key: "dataCoord.compaction.check.interval",
|
||||
Version: "2.0.0",
|
||||
DefaultValue: "10",
|
||||
DefaultValue: "3",
|
||||
}
|
||||
p.CompactionCheckIntervalInSeconds.Init(base.mgr)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user