diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 680e7eadf8..b957e5811c 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -266,17 +266,21 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err func (t *l0CompactionTask) processMetaSaved() bool { err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) if err == nil { - t.resetSegmentCompacting() - UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) - log.Info("handleCompactionResult: success to handle l0 compaction result") + return t.processCompleted() } - return err == nil + return false } func (t *l0CompactionTask) processCompleted() bool { - for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() { - t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false) + if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }); err != nil { + return false } + + t.resetSegmentCompacting() + UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) + log.Info("handleCompactionResult: success to handle l0 compaction result") return true } @@ -292,6 +296,12 @@ func (t *l0CompactionTask) processTimeout() bool { } func (t *l0CompactionTask) processFailed() bool { + if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }); err != nil { + return false + } + t.resetSegmentCompacting() return true } diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 7237849998..8fb6c800b7 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -50,11 +50,9 @@ func (t *mixCompactionTask) processPipelining() bool { func (t *mixCompactionTask) processMetaSaved() bool { err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) if err == nil { - t.resetSegmentCompacting() - UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) - log.Info("handleCompactionResult: success to handle merge compaction result") + return t.processCompleted() } - return err == nil + return false } func (t *mixCompactionTask) processExecuting() bool { @@ -77,11 +75,13 @@ func (t *mixCompactionTask) processExecuting() bool { return false case commonpb.CompactionState_Completed: t.result = result - result := t.result if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 { log.Info("illegal compaction results") err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) - return err == nil + if err != nil { + return false + } + return t.processFailed() } saveSuccess := t.saveSegmentMeta() if !saveSuccess { @@ -160,10 +160,16 @@ func (t *mixCompactionTask) NeedReAssignNodeID() bool { } func (t *mixCompactionTask) processCompleted() bool { - for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() { - t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false) + err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }) + if err == nil { + t.resetSegmentCompacting() + UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) + log.Info("handleCompactionResult: success to handle merge compaction result") } - return true + + return err == nil } func (t *mixCompactionTask) resetSegmentCompacting() { @@ -206,8 +212,14 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa } func (t *mixCompactionTask) processFailed() bool { - t.resetSegmentCompacting() - return true + err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }) + if err == nil { + t.resetSegmentCompacting() + } + + return err == nil } func (t *mixCompactionTask) checkTimeout() bool { diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 1dcbac1873..e568188345 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -570,6 +570,8 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { Segments: []*datapb.CompactionSegment{{PlanID: 6}}, }, nil).Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + inTasks := map[int64]CompactionTask{ 1: &mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ @@ -775,6 +777,7 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { } s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(111), int64(1)).Return(&compactionResult, nil).Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) s.handler.submitTask(task) s.handler.doSchedule() diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go index fc7cb51ef3..bbcd292112 100644 --- a/internal/datacoord/mock_channel_store.go +++ b/internal/datacoord/mock_channel_store.go @@ -179,50 +179,6 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int return _c } -// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID -func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { - ret := _m.Called(collectionID) - - var r0 map[int64][]string - if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok { - r0 = rf(collectionID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[int64][]string) - } - } - - return r0 -} - -// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID' -type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct { - *mock.Call -} - -// GetNodeChannelsByCollectionID is a helper method to define mock.On call -// - collectionID int64 -func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)} -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Return(run) - return _c -} - // GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { _va := make([]interface{}, len(channelSelectors)) @@ -282,6 +238,50 @@ func (_c *MockRWChannelStore_GetNodeChannelsBy_Call) RunAndReturn(run func(NodeS return _c } +// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID +func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { + ret := _m.Called(collectionID) + + var r0 map[int64][]string + if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok { + r0 = rf(collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64][]string) + } + } + + return r0 +} + +// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID' +type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct { + *mock.Call +} + +// GetNodeChannelsByCollectionID is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)} +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(run) + return _c +} + // GetNodes provides a mock function with given fields: func (_m *MockRWChannelStore) GetNodes() []int64 { ret := _m.Called() diff --git a/internal/datacoord/mock_channelmanager.go b/internal/datacoord/mock_channelmanager.go index e8b4ebe897..5239ab6e91 100644 --- a/internal/datacoord/mock_channelmanager.go +++ b/internal/datacoord/mock_channelmanager.go @@ -376,58 +376,6 @@ func (_c *MockChannelManager_GetNodeChannelsByCollectionID_Call) RunAndReturn(ru return _c } -// GetNodeIDByChannelName provides a mock function with given fields: channel -func (_m *MockChannelManager) GetNodeIDByChannelName(channel string) (int64, bool) { - ret := _m.Called(channel) - - var r0 int64 - var r1 bool - if rf, ok := ret.Get(0).(func(string) (int64, bool)); ok { - return rf(channel) - } - if rf, ok := ret.Get(0).(func(string) int64); ok { - r0 = rf(channel) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(string) bool); ok { - r1 = rf(channel) - } else { - r1 = ret.Get(1).(bool) - } - - return r0, r1 -} - -// MockChannelManager_GetNodeIDByChannelName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeIDByChannelName' -type MockChannelManager_GetNodeIDByChannelName_Call struct { - *mock.Call -} - -// GetNodeIDByChannelName is a helper method to define mock.On call -// - channel string -func (_e *MockChannelManager_Expecter) GetNodeIDByChannelName(channel interface{}) *MockChannelManager_GetNodeIDByChannelName_Call { - return &MockChannelManager_GetNodeIDByChannelName_Call{Call: _e.mock.On("GetNodeIDByChannelName", channel)} -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Run(run func(channel string)) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string)) - }) - return _c -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Return(_a0 int64, _a1 bool) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) RunAndReturn(run func(string) (int64, bool)) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Return(run) - return _c -} - // Match provides a mock function with given fields: nodeID, channel func (_m *MockChannelManager) Match(nodeID int64, channel string) bool { ret := _m.Called(nodeID, channel) diff --git a/internal/datacoord/mock_cluster.go b/internal/datacoord/mock_cluster.go index e92ae8ecb3..886de279ab 100644 --- a/internal/datacoord/mock_cluster.go +++ b/internal/datacoord/mock_cluster.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package datacoord diff --git a/internal/datacoord/mock_session_manager.go b/internal/datacoord/mock_session_manager.go index 7fb309e5e1..04942453da 100644 --- a/internal/datacoord/mock_session_manager.go +++ b/internal/datacoord/mock_session_manager.go @@ -264,6 +264,49 @@ func (_c *MockSessionManager_DeleteSession_Call) RunAndReturn(run func(*NodeInfo return _c } +// DropCompactionPlan provides a mock function with given fields: nodeID, req +func (_m *MockSessionManager) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error { + ret := _m.Called(nodeID, req) + + var r0 error + if rf, ok := ret.Get(0).(func(int64, *datapb.DropCompactionPlanRequest) error); ok { + r0 = rf(nodeID, req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSessionManager_DropCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropCompactionPlan' +type MockSessionManager_DropCompactionPlan_Call struct { + *mock.Call +} + +// DropCompactionPlan is a helper method to define mock.On call +// - nodeID int64 +// - req *datapb.DropCompactionPlanRequest +func (_e *MockSessionManager_Expecter) DropCompactionPlan(nodeID interface{}, req interface{}) *MockSessionManager_DropCompactionPlan_Call { + return &MockSessionManager_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", nodeID, req)} +} + +func (_c *MockSessionManager_DropCompactionPlan_Call) Run(run func(nodeID int64, req *datapb.DropCompactionPlanRequest)) *MockSessionManager_DropCompactionPlan_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(*datapb.DropCompactionPlanRequest)) + }) + return _c +} + +func (_c *MockSessionManager_DropCompactionPlan_Call) Return(_a0 error) *MockSessionManager_DropCompactionPlan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_DropCompactionPlan_Call) RunAndReturn(run func(int64, *datapb.DropCompactionPlanRequest) error) *MockSessionManager_DropCompactionPlan_Call { + _c.Call.Return(run) + return _c +} + // DropImport provides a mock function with given fields: nodeID, in func (_m *MockSessionManager) DropImport(nodeID int64, in *datapb.DropImportRequest) error { ret := _m.Called(nodeID, in) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index a1b792c7d9..5ac193a45b 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -323,6 +323,10 @@ func (c *mockDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo return &datapb.QuerySlotResponse{Status: merr.Success()}, nil } +func (c *mockDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return merr.Success(), nil +} + func (c *mockDataNodeClient) Stop() error { c.state = commonpb.StateCode_Abnormal return nil diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index db26b378c4..31e7a3eabf 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/cockroachdb/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -70,6 +71,7 @@ type SessionManager interface { DropImport(nodeID int64, in *datapb.DropImportRequest) error CheckHealth(ctx context.Context) error QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) + DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error Close() } @@ -547,6 +549,43 @@ func (c *SessionManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, return resp, nil } +func (c *SessionManagerImpl) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error { + log := log.With( + zap.Int64("nodeID", nodeID), + zap.Int64("planID", req.GetPlanID()), + ) + ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)) + defer cancel() + cli, err := c.getClient(ctx, nodeID) + if err != nil { + if errors.Is(err, merr.ErrNodeNotFound) { + log.Info("node not found, skip dropping compaction plan") + return nil + } + log.Warn("failed to get client", zap.Error(err)) + return err + } + + err = retry.Do(context.Background(), func() error { + ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)) + defer cancel() + + resp, err := cli.DropCompactionPlan(ctx, req) + if err := VerifyResponse(resp, err); err != nil { + log.Warn("failed to drop compaction plan", zap.Error(err)) + return err + } + return nil + }) + if err != nil { + log.Warn("failed to drop compaction plan after retry", zap.Error(err)) + return err + } + + log.Info("success to drop compaction plan") + return nil +} + // Close release sessions func (c *SessionManagerImpl) Close() { c.sessions.Lock() diff --git a/internal/datanode/services.go b/internal/datanode/services.go index f35802ca4e..53d81fb620 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -542,3 +542,13 @@ func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReques NumSlots: Params.DataNodeCfg.SlotCap.GetAsInt64() - int64(node.compactionExecutor.executing.Len()), }, nil } + +func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil + } + + node.compactionExecutor.removeTask(req.GetPlanID()) + log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID())) + return merr.Success(), nil +} diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 97c5ed51c4..848dc3c3ce 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -529,3 +529,28 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s.False(merr.Ok(status)) }) } + +func (s *DataNodeServicesSuite) TestDropCompactionPlan() { + s.Run("node not healthy", func() { + s.SetupTest() + s.node.UpdateStateCode(commonpb.StateCode_Abnormal) + + ctx := context.Background() + status, err := s.node.DropCompactionPlan(ctx, nil) + s.NoError(err) + s.False(merr.Ok(status)) + s.ErrorIs(merr.Error(status), merr.ErrServiceNotReady) + }) + + s.Run("normal case", func() { + s.SetupTest() + ctx := context.Background() + req := &datapb.DropCompactionPlanRequest{ + PlanID: 1, + } + + status, err := s.node.DropCompactionPlan(ctx, req) + s.NoError(err) + s.True(merr.Ok(status)) + }) +} diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 824f7762bd..10ba96f0c3 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -261,3 +261,9 @@ func (c *Client) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest, op return client.QuerySlot(ctx, req) }) } + +func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*commonpb.Status, error) { + return client.DropCompactionPlan(ctx, req) + }) +} diff --git a/internal/distributed/datanode/client/client_test.go b/internal/distributed/datanode/client/client_test.go index 88d135690c..f46d39bc12 100644 --- a/internal/distributed/datanode/client/client_test.go +++ b/internal/distributed/datanode/client/client_test.go @@ -83,6 +83,9 @@ func Test_NewClient(t *testing.T) { r13, err := client.CheckChannelOperationProgress(ctx, nil) retCheck(retNotNil, r13, err) + + r14, err := client.DropCompactionPlan(ctx, nil) + retCheck(retNotNil, r14, err) } client.(*Client).grpcClient = &mock.GRPCClientBase[datapb.DataNodeClient]{ diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 492b7d490b..9b68853960 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -406,3 +406,7 @@ func (s *Server) DropImport(ctx context.Context, req *datapb.DropImportRequest) func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*datapb.QuerySlotResponse, error) { return s.datanode.QuerySlot(ctx, req) } + +func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { + return s.datanode.DropCompactionPlan(ctx, req) +} diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index 0be3c5a493..655738732c 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -181,6 +181,10 @@ func (m *MockDataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReque return &datapb.QuerySlotResponse{}, m.err } +func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { + return m.status, m.err +} + // ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func Test_NewServer(t *testing.T) { paramtable.Init() @@ -317,6 +321,15 @@ func Test_NewServer(t *testing.T) { assert.NotNil(t, resp) }) + t.Run("DropCompactionPlans", func(t *testing.T) { + server.datanode = &MockDataNode{ + status: &commonpb.Status{}, + } + resp, err := server.DropCompactionPlan(ctx, nil) + assert.NoError(t, err) + assert.NotNil(t, resp) + }) + err = server.Stop() assert.NoError(t, err) } diff --git a/internal/mocks/mock_datanode.go b/internal/mocks/mock_datanode.go index b6dc02ae27..1d758de233 100644 --- a/internal/mocks/mock_datanode.go +++ b/internal/mocks/mock_datanode.go @@ -142,6 +142,61 @@ func (_c *MockDataNode_Compaction_Call) RunAndReturn(run func(context.Context, * return _c } +// DropCompactionPlan provides a mock function with given fields: _a0, _a1 +func (_m *MockDataNode) DropCompactionPlan(_a0 context.Context, _a1 *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropCompactionPlanRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropCompactionPlanRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.DropCompactionPlanRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNode_DropCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropCompactionPlan' +type MockDataNode_DropCompactionPlan_Call struct { + *mock.Call +} + +// DropCompactionPlan is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *datapb.DropCompactionPlanRequest +func (_e *MockDataNode_Expecter) DropCompactionPlan(_a0 interface{}, _a1 interface{}) *MockDataNode_DropCompactionPlan_Call { + return &MockDataNode_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", _a0, _a1)} +} + +func (_c *MockDataNode_DropCompactionPlan_Call) Run(run func(_a0 context.Context, _a1 *datapb.DropCompactionPlanRequest)) *MockDataNode_DropCompactionPlan_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datapb.DropCompactionPlanRequest)) + }) + return _c +} + +func (_c *MockDataNode_DropCompactionPlan_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataNode_DropCompactionPlan_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNode_DropCompactionPlan_Call) RunAndReturn(run func(context.Context, *datapb.DropCompactionPlanRequest) (*commonpb.Status, error)) *MockDataNode_DropCompactionPlan_Call { + _c.Call.Return(run) + return _c +} + // DropImport provides a mock function with given fields: _a0, _a1 func (_m *MockDataNode) DropImport(_a0 context.Context, _a1 *datapb.DropImportRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datanode_client.go b/internal/mocks/mock_datanode_client.go index f16ff8d170..8a244305c8 100644 --- a/internal/mocks/mock_datanode_client.go +++ b/internal/mocks/mock_datanode_client.go @@ -212,6 +212,76 @@ func (_c *MockDataNodeClient_Compaction_Call) RunAndReturn(run func(context.Cont return _c } +// DropCompactionPlan provides a mock function with given fields: ctx, in, opts +func (_m *MockDataNodeClient) DropCompactionPlan(ctx context.Context, in *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropCompactionPlanRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.DropCompactionPlanRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.DropCompactionPlanRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNodeClient_DropCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropCompactionPlan' +type MockDataNodeClient_DropCompactionPlan_Call struct { + *mock.Call +} + +// DropCompactionPlan is a helper method to define mock.On call +// - ctx context.Context +// - in *datapb.DropCompactionPlanRequest +// - opts ...grpc.CallOption +func (_e *MockDataNodeClient_Expecter) DropCompactionPlan(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropCompactionPlan_Call { + return &MockDataNodeClient_DropCompactionPlan_Call{Call: _e.mock.On("DropCompactionPlan", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataNodeClient_DropCompactionPlan_Call) Run(run func(ctx context.Context, in *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption)) *MockDataNodeClient_DropCompactionPlan_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*datapb.DropCompactionPlanRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataNodeClient_DropCompactionPlan_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataNodeClient_DropCompactionPlan_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNodeClient_DropCompactionPlan_Call) RunAndReturn(run func(context.Context, *datapb.DropCompactionPlanRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockDataNodeClient_DropCompactionPlan_Call { + _c.Call.Return(run) + return _c +} + // DropImport provides a mock function with given fields: ctx, in, opts func (_m *MockDataNodeClient) DropImport(ctx context.Context, in *datapb.DropImportRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index d7a7a3a6ea..e4a533becd 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -129,6 +129,8 @@ service DataNode { rpc DropImport(DropImportRequest) returns(common.Status) {} rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {} + + rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {} } message FlushRequest { @@ -892,3 +894,7 @@ message CompactionTask{ int64 nodeID = 18; schema.CollectionSchema schema = 19; } + +message DropCompactionPlanRequest { + int64 planID = 1; +} diff --git a/internal/util/mock/grpc_datanode_client.go b/internal/util/mock/grpc_datanode_client.go index 6226286bbe..49622ada6f 100644 --- a/internal/util/mock/grpc_datanode_client.go +++ b/internal/util/mock/grpc_datanode_client.go @@ -108,3 +108,7 @@ func (m *GrpcDataNodeClient) DropImport(ctx context.Context, req *datapb.DropImp func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest, opts ...grpc.CallOption) (*datapb.QuerySlotResponse, error) { return &datapb.QuerySlotResponse{}, m.Err } + +func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, m.Err +}