milvus/internal/datanode/broker/rootcoord.go
congqixia cbb350c552
Add broker for datanode grpc operations (#27631)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2023-10-11 17:03:34 +08:00

115 lines
3.5 KiB
Go

package broker
import (
"context"
"fmt"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type rootCoordBroker struct {
client types.RootCoordClient
}
func (rc *rootCoordBroker) DescribeCollection(ctx context.Context, collectionID typeutil.UniqueID, timestamp typeutil.Timestamp) (*milvuspb.DescribeCollectionResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Uint64("timestamp", timestamp),
)
req := &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
CollectionID: collectionID,
TimeStamp: timestamp,
}
resp, err := rc.client.DescribeCollectionInternal(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to DescribeCollectionInternal", zap.Error(err))
return nil, err
}
return resp, nil
}
func (rc *rootCoordBroker) ShowPartitions(ctx context.Context, dbName, collectionName string) (map[string]int64, error) {
req := &milvuspb.ShowPartitionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
),
DbName: dbName,
CollectionName: collectionName,
}
log := log.Ctx(ctx).With(
zap.String("dbName", dbName),
zap.String("collectionName", collectionName),
)
resp, err := rc.client.ShowPartitions(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to get partitions of collection", zap.Error(err))
return nil, err
}
partitionNames := resp.GetPartitionNames()
partitionIDs := resp.GetPartitionIDs()
if len(partitionNames) != len(partitionIDs) {
log.Warn("partition names and ids are unequal",
zap.Int("partitionNameNumber", len(partitionNames)),
zap.Int("partitionIDNumber", len(partitionIDs)))
return nil, fmt.Errorf("partition names and ids are unequal, number of names: %d, number of ids: %d",
len(partitionNames), len(partitionIDs))
}
partitions := make(map[string]int64)
for i := 0; i < len(partitionNames); i++ {
partitions[partitionNames[i]] = partitionIDs[i]
}
return partitions, nil
}
func (rc *rootCoordBroker) AllocTimestamp(ctx context.Context, num uint32) (uint64, uint32, error) {
log := log.Ctx(ctx)
req := &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
Count: num,
}
resp, err := rc.client.AllocTimestamp(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to AllocTimestamp", zap.Error(err))
return 0, 0, err
}
return resp.GetTimestamp(), resp.GetCount(), nil
}
func (rc *rootCoordBroker) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult) error {
log := log.Ctx(ctx)
resp, err := rc.client.ReportImport(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to ReportImport", zap.Error(err))
return err
}
return nil
}