Update: Wrap common grpc calls datacoord client (#24311)

Signed-off-by: Preetham <kamidipreetham@gmail.com>
This commit is contained in:
Preetham 2023-05-23 09:13:25 +05:30 committed by GitHub
parent d16e18fd34
commit d8a3b9d07e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -125,46 +125,38 @@ func (c *Client) Register() error {
return nil
}
// GetComponentStates calls DataCoord GetComponentStates services
func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
func wrapGrpcCall[T any](ctx context.Context, c *Client, call func(coordClient datapb.DataCoordClient) (*T, error)) (*T, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
return call(client)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.ComponentStates), err
return ret.(*T), err
}
// GetComponentStates calls DataCoord GetComponentStates services
func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.ComponentStates, error) {
return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
})
}
// GetTimeTickChannel return the name of time tick channel.
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.StringResponse, error) {
return client.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.StringResponse), err
}
// GetStatisticsChannel return the name of statistics channel.
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
ret, err := c.grpcClient.Call(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.StringResponse, error) {
return client.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.StringResponse), err
}
// Flush flushes a collection's data
@ -174,25 +166,18 @@ func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.FlushResponse, error) {
return client.Flush(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.FlushResponse), err
}
// AssignSegmentID applies allocations for specified Coolection/Partition and related Channel Name(Virtial Channel)
//
// ctx is the context to control request deadline and cancellation
// req contains the requester's info(id and role) and the list of Assignment Request,
// which coontains the specified collection, partitaion id, the related VChannel Name and row count it needs
// which contains the specified collection, partitaion id, the related VChannel Name and row count it needs
//
// response struct `AssignSegmentIDResponse` contains the the assignment result for each request
// response struct `AssignSegmentIDResponse` contains the assignment result for each request
// error is returned only when some communication issue occurs
// if some error occurs in the process of `AssignSegmentID`, it will be recorded and returned in `Status` field of response
//
@ -200,16 +185,9 @@ func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
// if the VChannel is newly used, `WatchDmlChannels` will be invoked to notify a `DataNode`(selected by policy) to watch it
// if there is anything make the allocation impossible, the response will not contain the corresponding result
func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.AssignSegmentIDResponse, error) {
return client.AssignSegmentID(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.AssignSegmentIDResponse), err
}
// GetSegmentStates requests segment state information
@ -229,16 +207,9 @@ func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetSegmentStatesResponse, error) {
return client.GetSegmentStates(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetSegmentStatesResponse), err
}
// GetInsertBinlogPaths requests binlog paths for specified segment
@ -257,16 +228,9 @@ func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetInsertBinlogPathsResponse, error) {
return client.GetInsertBinlogPaths(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetInsertBinlogPathsResponse), err
}
// GetCollectionStatistics requests collection statistics
@ -285,16 +249,9 @@ func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetCollectionStatisticsResponse, error) {
return client.GetCollectionStatistics(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetCollectionStatisticsResponse), err
}
// GetPartitionStatistics requests partition statistics
@ -313,31 +270,17 @@ func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetPartitionStatisticsResponse, error) {
return client.GetPartitionStatistics(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetPartitionStatisticsResponse), err
}
// GetSegmentInfoChannel DEPRECATED
// legacy api to get SegmentInfo Channel name
func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.StringResponse, error) {
return client.GetSegmentInfoChannel(ctx, &datapb.GetSegmentInfoChannelRequest{})
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.StringResponse), err
}
// GetSegmentInfo requests segment info
@ -353,16 +296,9 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetSegmentInfoResponse, error) {
return client.GetSegmentInfo(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetSegmentInfoResponse), err
}
// SaveBinlogPaths updates segments binlogs(including insert binlogs, stats logs and delta logs)
@ -386,16 +322,9 @@ func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.Call(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.SaveBinlogPaths(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
// GetRecoveryInfo request segment recovery info of collection/partition
@ -411,16 +340,9 @@ func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetRecoveryInfoResponse, error) {
return client.GetRecoveryInfo(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetRecoveryInfoResponse), err
}
// GetRecoveryInfoV2 request segment recovery info of collection/partitions
@ -436,16 +358,9 @@ func (c *Client) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetRecoveryInfoResponseV2, error) {
return client.GetRecoveryInfoV2(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetRecoveryInfoResponseV2), err
}
// GetFlushedSegments returns flushed segment list of requested collection/parition
@ -463,16 +378,9 @@ func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetFlushedSegmentsResponse, error) {
return client.GetFlushedSegments(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetFlushedSegmentsResponse), err
}
// GetSegmentsByStates returns segment list of requested collection/partition and segment states
@ -489,16 +397,9 @@ func (c *Client) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetSegmentsByStatesResponse, error) {
return client.GetSegmentsByStates(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GetSegmentsByStatesResponse), err
}
// ShowConfigurations gets specified configurations para of DataCoord
@ -508,17 +409,9 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*internalpb.ShowConfigurationsResponse, error) {
return client.ShowConfigurations(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*internalpb.ShowConfigurationsResponse), err
}
// GetMetrics gets all metrics of datacoord
@ -528,100 +421,51 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.GetMetricsResponse, error) {
return client.GetMetrics(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.GetMetricsResponse), err
}
// ManualCompaction triggers a compaction for a collection
func (c *Client) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.ManualCompactionResponse, error) {
return client.ManualCompaction(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.ManualCompactionResponse), err
}
// GetCompactionState gets the state of a compaction
func (c *Client) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.GetCompactionStateResponse, error) {
return client.GetCompactionState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.GetCompactionStateResponse), err
}
// GetCompactionStateWithPlans gets the state of a compaction by plan
func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.GetCompactionPlansResponse, error) {
return client.GetCompactionStateWithPlans(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.GetCompactionPlansResponse), err
}
// WatchChannels notifies DataCoord to watch vchannels of a collection
func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.WatchChannelsResponse, error) {
return client.WatchChannels(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.WatchChannelsResponse), err
}
// GetFlushState gets the flush state of multiple segments
func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.GetFlushStateResponse, error) {
return client.GetFlushState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.GetFlushStateResponse), err
}
// GetFlushAllState checks if all DML messages before `FlushAllTs` have been flushed.
func (c *Client) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.GetFlushAllStateResponse, error) {
return client.GetFlushAllState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.GetFlushAllStateResponse), err
}
// DropVirtualChannel drops virtual channel in datacoord.
@ -631,16 +475,9 @@ func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.DropVirtualChannelResponse, error) {
return client.DropVirtualChannel(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.DropVirtualChannelResponse), err
}
// SetSegmentState sets the state of a given segment.
@ -650,16 +487,9 @@ func (c *Client) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStat
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.SetSegmentStateResponse, error) {
return client.SetSegmentState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.SetSegmentStateResponse), err
}
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
@ -669,16 +499,9 @@ func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*da
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.ImportTaskResponse, error) {
return client.Import(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.ImportTaskResponse), err
}
// UpdateSegmentStatistics is the client side caller of UpdateSegmentStatistics.
@ -688,16 +511,9 @@ func (c *Client) UpdateSegmentStatistics(ctx context.Context, req *datapb.Update
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.UpdateSegmentStatistics(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
// UpdateChannelCheckpoint updates channel checkpoint in dataCoord.
@ -707,16 +523,9 @@ func (c *Client) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.UpdateChannelCheckpoint(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
// SaveImportSegment is the DataCoord client side code for SaveImportSegment call.
@ -726,16 +535,9 @@ func (c *Client) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.SaveImportSegment(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
func (c *Client) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
@ -744,16 +546,9 @@ func (c *Client) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsI
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.UnsetIsImportingState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
func (c *Client) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) {
@ -762,167 +557,83 @@ func (c *Client) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmen
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.MarkSegmentsDropped(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
// BroadcastAlteredCollection is the DataCoord client side code for BroadcastAlteredCollection call.
func (c *Client) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.BroadcastAlteredCollection(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*milvuspb.CheckHealthResponse, error) {
return client.CheckHealth(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.CheckHealthResponse), err
}
func (c *Client) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GcConfirmResponse, error) {
return client.GcConfirm(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.GcConfirmResponse), err
}
// CreateIndex sends the build index request to IndexCoord.
func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.CreateIndex(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
// GetIndexState gets the index states from IndexCoord.
func (c *Client) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) {
return client.GetIndexState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*indexpb.GetIndexStateResponse), err
}
// GetSegmentIndexState gets the index states from IndexCoord.
func (c *Client) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetSegmentIndexStateResponse, error) {
return client.GetSegmentIndexState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*indexpb.GetSegmentIndexStateResponse), err
}
// GetIndexInfos gets the index file paths from IndexCoord.
func (c *Client) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexInfoResponse, error) {
return client.GetIndexInfos(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*indexpb.GetIndexInfoResponse), err
}
// DescribeIndex describe the index info of the collection.
func (c *Client) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.DescribeIndexResponse, error) {
return client.DescribeIndex(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*indexpb.DescribeIndexResponse), err
}
// GetIndexStatistics get the statistics of the index.
func (c *Client) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexStatisticsRequest) (*indexpb.GetIndexStatisticsResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStatisticsResponse, error) {
return client.GetIndexStatistics(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*indexpb.GetIndexStatisticsResponse), err
}
// GetIndexBuildProgress describe the progress of the index.
func (c *Client) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexBuildProgressResponse, error) {
return client.GetIndexBuildProgress(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*indexpb.GetIndexBuildProgressResponse), err
}
// DropIndex sends the drop index request to IndexCoord.
func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.DropIndex(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}