mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Cherry pick from master pr: #37076 Related #36887 DirectFoward streaming delete will cause memory usage explode if the segments number was large. This PR add batching delete API and using it for direct forward implementation. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
79891f047d
commit
6bc8aba17f
@ -332,3 +332,16 @@ func (c *Client) Delete(ctx context.Context, req *querypb.DeleteRequest, _ ...gr
|
|||||||
return client.Delete(ctx, req)
|
return client.Delete(ctx, req)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteBatch is the API to apply same delete data into multiple segments.
|
||||||
|
// it's basically same as `Delete` but cost less memory pressure.
|
||||||
|
func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest, _ ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
req = typeutil.Clone(req)
|
||||||
|
commonpbutil.UpdateMsgBase(
|
||||||
|
req.GetBase(),
|
||||||
|
commonpbutil.FillMsgBaseFromClient(c.nodeID),
|
||||||
|
)
|
||||||
|
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
return client.DeleteBatch(ctx, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -108,6 +108,9 @@ func Test_NewClient(t *testing.T) {
|
|||||||
r20, err := client.SearchSegments(ctx, nil)
|
r20, err := client.SearchSegments(ctx, nil)
|
||||||
retCheck(retNotNil, r20, err)
|
retCheck(retNotNil, r20, err)
|
||||||
|
|
||||||
|
r21, err := client.DeleteBatch(ctx, nil)
|
||||||
|
retCheck(retNotNil, r21, err)
|
||||||
|
|
||||||
// stream rpc
|
// stream rpc
|
||||||
client, err := client.QueryStream(ctx, nil)
|
client, err := client.QueryStream(ctx, nil)
|
||||||
retCheck(retNotNil, client, err)
|
retCheck(retNotNil, client, err)
|
||||||
|
@ -383,3 +383,9 @@ func (s *Server) SyncDistribution(ctx context.Context, req *querypb.SyncDistribu
|
|||||||
func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) {
|
func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) {
|
||||||
return s.querynode.Delete(ctx, req)
|
return s.querynode.Delete(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteBatch is the API to apply same delete data into multiple segments.
|
||||||
|
// it's basically same as `Delete` but cost less memory pressure.
|
||||||
|
func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
return s.querynode.DeleteBatch(ctx, req)
|
||||||
|
}
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/types"
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -270,6 +271,15 @@ func Test_NewServer(t *testing.T) {
|
|||||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("DeleteBatch", func(t *testing.T) {
|
||||||
|
mockQN.EXPECT().DeleteBatch(mock.Anything, mock.Anything).Return(&querypb.DeleteBatchResponse{
|
||||||
|
Status: merr.Success(),
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
resp, err := server.DeleteBatch(ctx, &querypb.DeleteBatchRequest{})
|
||||||
|
assert.NoError(t, merr.CheckRPCCall(resp, err))
|
||||||
|
})
|
||||||
|
|
||||||
err = server.Stop()
|
err = server.Stop()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
@ -85,6 +85,65 @@ func (_c *MockQueryNode_Delete_Call) RunAndReturn(run func(context.Context, *que
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteBatch provides a mock function with given fields: _a0, _a1
|
||||||
|
func (_m *MockQueryNode) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
ret := _m.Called(_a0, _a1)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for DeleteBatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 *querypb.DeleteBatchResponse
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok {
|
||||||
|
return rf(_a0, _a1)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok {
|
||||||
|
r0 = rf(_a0, _a1)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*querypb.DeleteBatchResponse)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok {
|
||||||
|
r1 = rf(_a0, _a1)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockQueryNode_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch'
|
||||||
|
type MockQueryNode_DeleteBatch_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBatch is a helper method to define mock.On call
|
||||||
|
// - _a0 context.Context
|
||||||
|
// - _a1 *querypb.DeleteBatchRequest
|
||||||
|
func (_e *MockQueryNode_Expecter) DeleteBatch(_a0 interface{}, _a1 interface{}) *MockQueryNode_DeleteBatch_Call {
|
||||||
|
return &MockQueryNode_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", _a0, _a1)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNode_DeleteBatch_Call) Run(run func(_a0 context.Context, _a1 *querypb.DeleteBatchRequest)) *MockQueryNode_DeleteBatch_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNode_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNode_DeleteBatch_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNode_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockQueryNode_DeleteBatch_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// GetAddress provides a mock function with given fields:
|
// GetAddress provides a mock function with given fields:
|
||||||
func (_m *MockQueryNode) GetAddress() string {
|
func (_m *MockQueryNode) GetAddress() string {
|
||||||
ret := _m.Called()
|
ret := _m.Called()
|
||||||
|
@ -142,6 +142,80 @@ func (_c *MockQueryNodeClient_Delete_Call) RunAndReturn(run func(context.Context
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteBatch provides a mock function with given fields: ctx, in, opts
|
||||||
|
func (_m *MockQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
_va := make([]interface{}, len(opts))
|
||||||
|
for _i := range opts {
|
||||||
|
_va[_i] = opts[_i]
|
||||||
|
}
|
||||||
|
var _ca []interface{}
|
||||||
|
_ca = append(_ca, ctx, in)
|
||||||
|
_ca = append(_ca, _va...)
|
||||||
|
ret := _m.Called(_ca...)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for DeleteBatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 *querypb.DeleteBatchResponse
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) (*querypb.DeleteBatchResponse, error)); ok {
|
||||||
|
return rf(ctx, in, opts...)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) *querypb.DeleteBatchResponse); ok {
|
||||||
|
r0 = rf(ctx, in, opts...)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*querypb.DeleteBatchResponse)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) error); ok {
|
||||||
|
r1 = rf(ctx, in, opts...)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockQueryNodeClient_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch'
|
||||||
|
type MockQueryNodeClient_DeleteBatch_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBatch is a helper method to define mock.On call
|
||||||
|
// - ctx context.Context
|
||||||
|
// - in *querypb.DeleteBatchRequest
|
||||||
|
// - opts ...grpc.CallOption
|
||||||
|
func (_e *MockQueryNodeClient_Expecter) DeleteBatch(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_DeleteBatch_Call {
|
||||||
|
return &MockQueryNodeClient_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch",
|
||||||
|
append([]interface{}{ctx, in}, opts...)...)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNodeClient_DeleteBatch_Call) Run(run func(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_DeleteBatch_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
variadicArgs := make([]grpc.CallOption, len(args)-2)
|
||||||
|
for i, a := range args[2:] {
|
||||||
|
if a != nil {
|
||||||
|
variadicArgs[i] = a.(grpc.CallOption)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest), variadicArgs...)
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNodeClient_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNodeClient_DeleteBatch_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNodeClient_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) (*querypb.DeleteBatchResponse, error)) *MockQueryNodeClient_DeleteBatch_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// GetComponentStates provides a mock function with given fields: ctx, in, opts
|
// GetComponentStates provides a mock function with given fields: ctx, in, opts
|
||||||
func (_m *MockQueryNodeClient) GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) {
|
func (_m *MockQueryNodeClient) GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) {
|
||||||
_va := make([]interface{}, len(opts))
|
_va := make([]interface{}, len(opts))
|
||||||
|
@ -172,6 +172,10 @@ service QueryNode {
|
|||||||
}
|
}
|
||||||
rpc Delete(DeleteRequest) returns (common.Status) {
|
rpc Delete(DeleteRequest) returns (common.Status) {
|
||||||
}
|
}
|
||||||
|
// DeleteBatch is the API to apply same delete data into multiple segments.
|
||||||
|
// it's basically same as `Delete` but cost less memory pressure.
|
||||||
|
rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// --------------------QueryCoord grpc request and response proto------------------
|
// --------------------QueryCoord grpc request and response proto------------------
|
||||||
@ -772,6 +776,25 @@ message DeleteRequest {
|
|||||||
DataScope scope = 8;
|
DataScope scope = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message DeleteBatchRequest {
|
||||||
|
common.MsgBase base = 1;
|
||||||
|
int64 collection_id = 2;
|
||||||
|
int64 partition_id = 3;
|
||||||
|
string vchannel_name = 4;
|
||||||
|
repeated int64 segment_ids = 5;
|
||||||
|
schema.IDs primary_keys = 6;
|
||||||
|
repeated uint64 timestamps = 7;
|
||||||
|
DataScope scope = 8;
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBatchResponse returns failed/missing segment ids
|
||||||
|
// cannot just using common.Status to handle partial failure logic
|
||||||
|
message DeleteBatchResponse {
|
||||||
|
common.Status status = 1;
|
||||||
|
repeated int64 failed_ids = 2;
|
||||||
|
repeated int64 missing_ids = 3;
|
||||||
|
}
|
||||||
|
|
||||||
message ActivateCheckerRequest {
|
message ActivateCheckerRequest {
|
||||||
common.MsgBase base = 1;
|
common.MsgBase base = 1;
|
||||||
int32 checkerID = 2;
|
int32 checkerID = 2;
|
||||||
|
@ -84,6 +84,65 @@ func (_c *MockQueryNodeServer_Delete_Call) RunAndReturn(run func(context.Context
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteBatch provides a mock function with given fields: _a0, _a1
|
||||||
|
func (_m *MockQueryNodeServer) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
ret := _m.Called(_a0, _a1)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for DeleteBatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 *querypb.DeleteBatchResponse
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok {
|
||||||
|
return rf(_a0, _a1)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok {
|
||||||
|
r0 = rf(_a0, _a1)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*querypb.DeleteBatchResponse)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok {
|
||||||
|
r1 = rf(_a0, _a1)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockQueryNodeServer_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch'
|
||||||
|
type MockQueryNodeServer_DeleteBatch_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBatch is a helper method to define mock.On call
|
||||||
|
// - _a0 context.Context
|
||||||
|
// - _a1 *querypb.DeleteBatchRequest
|
||||||
|
func (_e *MockQueryNodeServer_Expecter) DeleteBatch(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_DeleteBatch_Call {
|
||||||
|
return &MockQueryNodeServer_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", _a0, _a1)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNodeServer_DeleteBatch_Call) Run(run func(_a0 context.Context, _a1 *querypb.DeleteBatchRequest)) *MockQueryNodeServer_DeleteBatch_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNodeServer_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNodeServer_DeleteBatch_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockQueryNodeServer_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockQueryNodeServer_DeleteBatch_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// GetComponentStates provides a mock function with given fields: _a0, _a1
|
// GetComponentStates provides a mock function with given fields: _a0, _a1
|
||||||
func (_m *MockQueryNodeServer) GetComponentStates(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
|
func (_m *MockQueryNodeServer) GetComponentStates(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
|
||||||
ret := _m.Called(_a0, _a1)
|
ret := _m.Called(_a0, _a1)
|
||||||
|
@ -69,6 +69,65 @@ func (_c *MockWorker_Delete_Call) RunAndReturn(run func(context.Context, *queryp
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteBatch provides a mock function with given fields: ctx, req
|
||||||
|
func (_m *MockWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
ret := _m.Called(ctx, req)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for DeleteBatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 *querypb.DeleteBatchResponse
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok {
|
||||||
|
return rf(ctx, req)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok {
|
||||||
|
r0 = rf(ctx, req)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*querypb.DeleteBatchResponse)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok {
|
||||||
|
r1 = rf(ctx, req)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockWorker_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch'
|
||||||
|
type MockWorker_DeleteBatch_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBatch is a helper method to define mock.On call
|
||||||
|
// - ctx context.Context
|
||||||
|
// - req *querypb.DeleteBatchRequest
|
||||||
|
func (_e *MockWorker_Expecter) DeleteBatch(ctx interface{}, req interface{}) *MockWorker_DeleteBatch_Call {
|
||||||
|
return &MockWorker_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", ctx, req)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockWorker_DeleteBatch_Call) Run(run func(ctx context.Context, req *querypb.DeleteBatchRequest)) *MockWorker_DeleteBatch_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockWorker_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockWorker_DeleteBatch_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MockWorker_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockWorker_DeleteBatch_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// GetStatistics provides a mock function with given fields: ctx, req
|
// GetStatistics provides a mock function with given fields: ctx, req
|
||||||
func (_m *MockWorker) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) {
|
func (_m *MockWorker) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) {
|
||||||
ret := _m.Called(ctx, req)
|
ret := _m.Called(ctx, req)
|
||||||
|
@ -39,6 +39,7 @@ type Worker interface {
|
|||||||
LoadSegments(context.Context, *querypb.LoadSegmentsRequest) error
|
LoadSegments(context.Context, *querypb.LoadSegmentsRequest) error
|
||||||
ReleaseSegments(context.Context, *querypb.ReleaseSegmentsRequest) error
|
ReleaseSegments(context.Context, *querypb.ReleaseSegmentsRequest) error
|
||||||
Delete(ctx context.Context, req *querypb.DeleteRequest) error
|
Delete(ctx context.Context, req *querypb.DeleteRequest) error
|
||||||
|
DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)
|
||||||
SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error)
|
SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error)
|
||||||
QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error)
|
QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error)
|
||||||
QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error
|
QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error
|
||||||
@ -141,6 +142,52 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *remoteWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
log := log.Ctx(ctx).With(
|
||||||
|
zap.Int64("workerID", req.GetBase().GetTargetID()),
|
||||||
|
)
|
||||||
|
client := w.getClient()
|
||||||
|
resp, err := client.DeleteBatch(ctx, req)
|
||||||
|
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||||
|
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||||
|
log.Warn("invoke legacy querynode DeleteBatch method, fallback to ")
|
||||||
|
return w.splitDeleteBatch(ctx, req)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *remoteWorker) splitDeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
sReq := &querypb.DeleteRequest{
|
||||||
|
CollectionId: req.GetCollectionId(),
|
||||||
|
PartitionId: req.GetPartitionId(),
|
||||||
|
VchannelName: req.GetVchannelName(),
|
||||||
|
PrimaryKeys: req.GetPrimaryKeys(),
|
||||||
|
Timestamps: req.GetTimestamps(),
|
||||||
|
Scope: req.GetScope(),
|
||||||
|
}
|
||||||
|
// do fallback without parallel, to protect the mem limit
|
||||||
|
var missingIDs []int64
|
||||||
|
var failedIDs []int64
|
||||||
|
for _, segmentID := range req.GetSegmentIds() {
|
||||||
|
sReq.SegmentId = segmentID
|
||||||
|
err := w.Delete(ctx, sReq)
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, merr.ErrSegmentNotFound):
|
||||||
|
missingIDs = append(missingIDs, segmentID)
|
||||||
|
case err != nil:
|
||||||
|
failedIDs = append(failedIDs, segmentID)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &querypb.DeleteBatchResponse{
|
||||||
|
Status: merr.Success(),
|
||||||
|
FailedIds: failedIDs,
|
||||||
|
MissingIds: missingIDs,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
|
func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
|
||||||
client := w.getClient()
|
client := w.getClient()
|
||||||
ret, err := client.SearchSegments(ctx, req)
|
ret, err := client.SearchSegments(ctx, req)
|
||||||
|
@ -193,6 +193,70 @@ func (s *RemoteWorkerSuite) TestDelete() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *RemoteWorkerSuite) TestDeleteBatch() {
|
||||||
|
s.Run("normal_run", func() {
|
||||||
|
defer func() { s.mockClient.ExpectedCalls = nil }()
|
||||||
|
|
||||||
|
s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")).
|
||||||
|
Return(&querypb.DeleteBatchResponse{Status: merr.Success()}, nil).Once()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
|
||||||
|
SegmentIds: []int64{100, 200},
|
||||||
|
})
|
||||||
|
s.NoError(merr.CheckRPCCall(resp, err))
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("client_return_error", func() {
|
||||||
|
defer func() { s.mockClient.ExpectedCalls = nil }()
|
||||||
|
|
||||||
|
s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")).
|
||||||
|
Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
|
||||||
|
SegmentIds: []int64{100, 200},
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Error(merr.CheckRPCCall(resp, err))
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("client_return_fail_status", func() {
|
||||||
|
defer func() { s.mockClient.ExpectedCalls = nil }()
|
||||||
|
|
||||||
|
s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")).
|
||||||
|
Return(&querypb.DeleteBatchResponse{
|
||||||
|
Status: merr.Status(merr.WrapErrServiceUnavailable("mocked")),
|
||||||
|
}, nil).Once()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
|
||||||
|
SegmentIds: []int64{100, 200},
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Error(merr.CheckRPCCall(resp, err))
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run("batch_delete_unimplemented", func() {
|
||||||
|
defer func() { s.mockClient.ExpectedCalls = nil }()
|
||||||
|
|
||||||
|
s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")).
|
||||||
|
Return(nil, merr.WrapErrServiceUnimplemented(status.Errorf(codes.Unimplemented, "mocked grpc unimplemented")))
|
||||||
|
s.mockClient.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.Success(), nil).Times(2)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
|
||||||
|
SegmentIds: []int64{100, 200},
|
||||||
|
})
|
||||||
|
|
||||||
|
s.NoError(merr.CheckRPCCall(resp, err))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (s *RemoteWorkerSuite) TestSearch() {
|
func (s *RemoteWorkerSuite) TestSearch() {
|
||||||
s.Run("normal_run", func() {
|
s.Run("normal_run", func() {
|
||||||
defer func() { s.mockClient.ExpectedCalls = nil }()
|
defer func() { s.mockClient.ExpectedCalls = nil }()
|
||||||
|
@ -19,8 +19,10 @@ package delegator
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
@ -36,8 +38,10 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -276,50 +280,43 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData
|
|||||||
sealed, growing, version := sd.distribution.PinOnlineSegments(partitions...)
|
sealed, growing, version := sd.distribution.PinOnlineSegments(partitions...)
|
||||||
defer sd.distribution.Unpin(version)
|
defer sd.distribution.Unpin(version)
|
||||||
|
|
||||||
for _, item := range group {
|
for _, entry := range sealed {
|
||||||
deleteData := *item
|
entry := entry
|
||||||
for _, entry := range sealed {
|
worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID)
|
||||||
entry := entry
|
if err != nil {
|
||||||
worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID)
|
log.Warn("failed to get worker",
|
||||||
if err != nil {
|
zap.Int64("nodeID", entry.NodeID),
|
||||||
log.Warn("failed to get worker",
|
zap.Error(err),
|
||||||
zap.Int64("nodeID", entry.NodeID),
|
)
|
||||||
zap.Error(err),
|
// skip if node down
|
||||||
)
|
// delete will be processed after loaded again
|
||||||
// skip if node down
|
continue
|
||||||
// delete will be processed after loaded again
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// forward to non level0 segment only
|
|
||||||
segments := lo.Filter(entry.Segments, func(segmentEntry SegmentEntry, _ int) bool {
|
|
||||||
return segmentEntry.Level != datapb.SegmentLevel_L0
|
|
||||||
})
|
|
||||||
|
|
||||||
eg.Go(func() error {
|
|
||||||
offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, func(segmentID int64) (DeleteData, bool) {
|
|
||||||
return deleteData, true
|
|
||||||
}, segments, querypb.DataScope_Historical)...)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
// forward to non level0 segment only
|
||||||
|
segments := lo.Filter(entry.Segments, func(segmentEntry SegmentEntry, _ int) bool {
|
||||||
|
return segmentEntry.Level != datapb.SegmentLevel_L0
|
||||||
|
})
|
||||||
|
|
||||||
if len(growing) > 0 {
|
eg.Go(func() error {
|
||||||
worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID())
|
offlineSegments.Upsert(sd.applyDeleteBatch(ctx, entry.NodeID, worker, group, segments, querypb.DataScope_Historical)...)
|
||||||
if err != nil {
|
return nil
|
||||||
log.Error("failed to get worker(local)",
|
})
|
||||||
zap.Int64("nodeID", paramtable.GetNodeID()),
|
}
|
||||||
zap.Error(err),
|
|
||||||
)
|
if len(growing) > 0 {
|
||||||
// panic here, local worker shall not have error
|
worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID())
|
||||||
panic(err)
|
if err != nil {
|
||||||
}
|
log.Error("failed to get worker(local)",
|
||||||
eg.Go(func() error {
|
zap.Int64("nodeID", paramtable.GetNodeID()),
|
||||||
offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, func(segmentID int64) (DeleteData, bool) {
|
zap.Error(err),
|
||||||
return deleteData, true
|
)
|
||||||
}, growing, querypb.DataScope_Streaming)...)
|
// panic here, local worker shall not have error
|
||||||
return nil
|
panic(err)
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
eg.Go(func() error {
|
||||||
|
offlineSegments.Upsert(sd.applyDeleteBatch(ctx, paramtable.GetNodeID(), worker, group, growing, querypb.DataScope_Streaming)...)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -336,3 +333,72 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData
|
|||||||
|
|
||||||
metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds()))
|
metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// applyDeleteBatch handles delete record and apply them to corresponding workers in batch.
|
||||||
|
func (sd *shardDelegator) applyDeleteBatch(ctx context.Context,
|
||||||
|
nodeID int64,
|
||||||
|
worker cluster.Worker,
|
||||||
|
data []*DeleteData,
|
||||||
|
entries []SegmentEntry,
|
||||||
|
scope querypb.DataScope,
|
||||||
|
) []int64 {
|
||||||
|
offlineSegments := typeutil.NewConcurrentSet[int64]()
|
||||||
|
log := sd.getLogger(ctx)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0) * 4)
|
||||||
|
defer pool.Release()
|
||||||
|
|
||||||
|
var futures []*conc.Future[struct{}]
|
||||||
|
for _, delData := range data {
|
||||||
|
delData := delData
|
||||||
|
segmentIDs := lo.Map(entries, func(entry SegmentEntry, _ int) int64 {
|
||||||
|
return entry.SegmentID
|
||||||
|
})
|
||||||
|
future := pool.Submit(func() (struct{}, error) {
|
||||||
|
log.Debug("delegator plan to applyDelete via worker")
|
||||||
|
err := retry.Handle(ctx, func() (bool, error) {
|
||||||
|
if sd.Stopped() {
|
||||||
|
return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
|
||||||
|
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
|
||||||
|
CollectionId: sd.collectionID,
|
||||||
|
PartitionId: delData.PartitionID,
|
||||||
|
VchannelName: sd.vchannelName,
|
||||||
|
SegmentIds: segmentIDs,
|
||||||
|
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delData.PrimaryKeys),
|
||||||
|
Timestamps: delData.Timestamps,
|
||||||
|
Scope: scope,
|
||||||
|
})
|
||||||
|
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||||
|
log.Warn("try to delete data on non-exist node")
|
||||||
|
// cancel other request
|
||||||
|
cancel()
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
// grpc/network error
|
||||||
|
if err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
if len(resp.GetMissingIds()) > 0 {
|
||||||
|
log.Warn("try to delete data of released segment", zap.Int64s("ids", resp.GetMissingIds()))
|
||||||
|
}
|
||||||
|
if len(resp.GetFailedIds()) > 0 {
|
||||||
|
log.Warn("apply delete for segment failed, marking it offline")
|
||||||
|
offlineSegments.Upsert(resp.GetFailedIds()...)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}, retry.Attempts(10))
|
||||||
|
|
||||||
|
return struct{}{}, err
|
||||||
|
})
|
||||||
|
futures = append(futures, future)
|
||||||
|
}
|
||||||
|
|
||||||
|
conc.AwaitAll(futures...)
|
||||||
|
return offlineSegments.Collect()
|
||||||
|
}
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/metric"
|
"github.com/milvus-io/milvus/pkg/util/metric"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||||
@ -243,12 +244,11 @@ func (s *StreamingForwardSuite) TestDirectStreamingForward() {
|
|||||||
deletedSegment := typeutil.NewConcurrentSet[int64]()
|
deletedSegment := typeutil.NewConcurrentSet[int64]()
|
||||||
mockWorker := cluster.NewMockWorker(s.T())
|
mockWorker := cluster.NewMockWorker(s.T())
|
||||||
s.workerManager.EXPECT().GetWorker(mock.Anything, int64(1)).Return(mockWorker, nil)
|
s.workerManager.EXPECT().GetWorker(mock.Anything, int64(1)).Return(mockWorker, nil)
|
||||||
mockWorker.EXPECT().Delete(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteRequest) error {
|
mockWorker.EXPECT().DeleteBatch(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
s.T().Log(dr.GetSegmentId())
|
deletedSegment.Upsert(dr.GetSegmentIds()...)
|
||||||
deletedSegment.Insert(dr.SegmentId)
|
|
||||||
s.ElementsMatch([]int64{10}, dr.GetPrimaryKeys().GetIntId().GetData())
|
s.ElementsMatch([]int64{10}, dr.GetPrimaryKeys().GetIntId().GetData())
|
||||||
s.ElementsMatch([]uint64{10}, dr.GetTimestamps())
|
s.ElementsMatch([]uint64{10}, dr.GetTimestamps())
|
||||||
return nil
|
return &querypb.DeleteBatchResponse{Status: merr.Success()}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
delegator.ProcessDelete([]*DeleteData{
|
delegator.ProcessDelete([]*DeleteData{
|
||||||
|
@ -53,6 +53,10 @@ func (w *LocalWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) er
|
|||||||
return merr.CheckRPCCall(status, err)
|
return merr.CheckRPCCall(status, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *LocalWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
return w.node.DeleteBatch(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
func (w *LocalWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
|
func (w *LocalWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
|
||||||
return w.node.SearchSegments(ctx, req)
|
return w.node.SearchSegments(ctx, req)
|
||||||
}
|
}
|
||||||
|
@ -87,6 +87,22 @@ func (f SegmentIDFilter) SegmentIDs() ([]int64, bool) {
|
|||||||
return []int64{int64(f)}, true
|
return []int64{int64(f)}, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SegmentIDsFilter struct {
|
||||||
|
segmentIDs typeutil.Set[int64]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f SegmentIDsFilter) Filter(segment Segment) bool {
|
||||||
|
return f.segmentIDs.Contain(segment.ID())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f SegmentIDsFilter) SegmentType() (SegmentType, bool) {
|
||||||
|
return commonpb.SegmentState_SegmentStateNone, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f SegmentIDsFilter) SegmentIDs() ([]int64, bool) {
|
||||||
|
return f.segmentIDs.Collect(), true
|
||||||
|
}
|
||||||
|
|
||||||
type SegmentTypeFilter SegmentType
|
type SegmentTypeFilter SegmentType
|
||||||
|
|
||||||
func (f SegmentTypeFilter) Filter(segment Segment) bool {
|
func (f SegmentTypeFilter) Filter(segment Segment) bool {
|
||||||
@ -133,6 +149,12 @@ func WithID(id int64) SegmentFilter {
|
|||||||
return SegmentIDFilter(id)
|
return SegmentIDFilter(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithIDs(ids ...int64) SegmentFilter {
|
||||||
|
return SegmentIDsFilter{
|
||||||
|
segmentIDs: typeutil.NewSet(ids...),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithLevel(level datapb.SegmentLevel) SegmentFilter {
|
func WithLevel(level datapb.SegmentLevel) SegmentFilter {
|
||||||
return SegmentFilterFunc(func(segment Segment) bool {
|
return SegmentFilterFunc(func(segment Segment) bool {
|
||||||
return segment.Level() == level
|
return segment.Level() == level
|
||||||
|
@ -19,6 +19,7 @@ package querynodev2
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -47,6 +48,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
@ -1426,6 +1428,84 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
|
|||||||
return merr.Success(), nil
|
return merr.Success(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteBatch is the API to apply same delete data into multiple segments.
|
||||||
|
// it's basically same as `Delete` but cost less memory pressure.
|
||||||
|
func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
log := log.Ctx(ctx).With(
|
||||||
|
zap.Int64("collectionID", req.GetCollectionId()),
|
||||||
|
zap.String("channel", req.GetVchannelName()),
|
||||||
|
zap.Int64s("segmentIDs", req.GetSegmentIds()),
|
||||||
|
zap.String("scope", req.GetScope().String()),
|
||||||
|
)
|
||||||
|
|
||||||
|
// check node healthy
|
||||||
|
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
|
||||||
|
return &querypb.DeleteBatchResponse{
|
||||||
|
Status: merr.Status(err),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
defer node.lifetime.Done()
|
||||||
|
|
||||||
|
// log.Debug("QueryNode received worker delete detail", zap.Stringer("info", &deleteRequestStringer{DeleteRequest: req}))
|
||||||
|
|
||||||
|
filters := []segments.SegmentFilter{
|
||||||
|
segments.WithIDs(req.GetSegmentIds()...),
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not add filter for Unknown & All scope, for backward cap
|
||||||
|
switch req.GetScope() {
|
||||||
|
case querypb.DataScope_Historical:
|
||||||
|
filters = append(filters, segments.WithType(segments.SegmentTypeSealed))
|
||||||
|
case querypb.DataScope_Streaming:
|
||||||
|
filters = append(filters, segments.WithType(segments.SegmentTypeGrowing))
|
||||||
|
}
|
||||||
|
|
||||||
|
segs := node.manager.Segment.GetBy(filters...)
|
||||||
|
|
||||||
|
hitIDs := lo.Map(segs, func(segment segments.Segment, _ int) int64 {
|
||||||
|
return segment.ID()
|
||||||
|
})
|
||||||
|
// calculate missing ids, continue to delete existing ones.
|
||||||
|
missingIDs := typeutil.NewSet(req.GetSegmentIds()...).Complement(typeutil.NewSet(hitIDs...))
|
||||||
|
if missingIDs.Len() > 0 {
|
||||||
|
log.Warn("Delete batch find missing ids", zap.Int64s("missing_ids", missingIDs.Collect()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys())
|
||||||
|
|
||||||
|
// control the execution batch parallel with P number
|
||||||
|
// maybe it shall be lower in case of heavy CPU usage may impacting search/query
|
||||||
|
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0))
|
||||||
|
futures := make([]*conc.Future[struct{}], 0, len(segs))
|
||||||
|
errSet := typeutil.NewConcurrentSet[int64]()
|
||||||
|
|
||||||
|
for _, segment := range segs {
|
||||||
|
segment := segment
|
||||||
|
futures = append(futures, pool.Submit(func() (struct{}, error) {
|
||||||
|
// TODO @silverxia, add interface to use same data struct for segment delete
|
||||||
|
// current implementation still copys pks into protobuf(or arrow) struct
|
||||||
|
err := segment.Delete(ctx, pks, req.GetTimestamps())
|
||||||
|
if err != nil {
|
||||||
|
errSet.Insert(segment.ID())
|
||||||
|
log.Warn("segment delete failed",
|
||||||
|
zap.Int64("segmentID", segment.ID()),
|
||||||
|
zap.Error(err))
|
||||||
|
return struct{}{}, err
|
||||||
|
}
|
||||||
|
return struct{}{}, nil
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore error returned, since error segment is recorded into error set
|
||||||
|
_ = conc.AwaitAll(futures...)
|
||||||
|
|
||||||
|
// return merr.Success(), nil
|
||||||
|
return &querypb.DeleteBatchResponse{
|
||||||
|
Status: merr.Success(),
|
||||||
|
FailedIds: errSet.Collect(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
type deleteRequestStringer struct {
|
type deleteRequestStringer struct {
|
||||||
*querypb.DeleteRequest
|
*querypb.DeleteRequest
|
||||||
}
|
}
|
||||||
|
@ -130,6 +130,10 @@ func (m *GrpcQueryNodeClient) Delete(ctx context.Context, in *querypb.DeleteRequ
|
|||||||
return &commonpb.Status{}, m.Err
|
return &commonpb.Status{}, m.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
return &querypb.DeleteBatchResponse{}, m.Err
|
||||||
|
}
|
||||||
|
|
||||||
func (m *GrpcQueryNodeClient) Close() error {
|
func (m *GrpcQueryNodeClient) Close() error {
|
||||||
return m.Err
|
return m.Err
|
||||||
}
|
}
|
||||||
|
@ -148,6 +148,10 @@ func (qn *qnServerWrapper) Delete(ctx context.Context, in *querypb.DeleteRequest
|
|||||||
return qn.QueryNode.Delete(ctx, in)
|
return qn.QueryNode.Delete(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
|
||||||
|
return qn.QueryNode.DeleteBatch(ctx, in)
|
||||||
|
}
|
||||||
|
|
||||||
func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient {
|
func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient {
|
||||||
return &qnServerWrapper{
|
return &qnServerWrapper{
|
||||||
QueryNode: qn,
|
QueryNode: qn,
|
||||||
|
@ -246,6 +246,17 @@ func (s *QnWrapperSuite) TestDelete() {
|
|||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *QnWrapperSuite) TestDeleteBatch() {
|
||||||
|
s.qn.EXPECT().DeleteBatch(mock.Anything, mock.Anything).
|
||||||
|
Return(&querypb.DeleteBatchResponse{
|
||||||
|
Status: merr.Status(nil),
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
resp, err := s.client.DeleteBatch(context.Background(), &querypb.DeleteBatchRequest{})
|
||||||
|
err = merr.CheckRPCCall(resp, err)
|
||||||
|
s.NoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Race caused by mock parameter check on once
|
// Race caused by mock parameter check on once
|
||||||
/*
|
/*
|
||||||
func (s *QnWrapperSuite) TestQueryStream() {
|
func (s *QnWrapperSuite) TestQueryStream() {
|
||||||
|
Loading…
Reference in New Issue
Block a user