2021-06-22 10:42:07 +08:00
|
|
|
package datacoord
|
2021-05-25 15:35:37 +08:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"strconv"
|
2021-05-28 09:55:21 +08:00
|
|
|
"sync/atomic"
|
2021-05-25 15:35:37 +08:00
|
|
|
|
2021-09-22 19:33:54 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/trace"
|
|
|
|
|
2021-05-25 15:35:37 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
2021-09-01 10:13:15 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
2021-05-25 15:35:37 +08:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// checks whether server in Healthy State
|
2021-05-28 09:55:21 +08:00
|
|
|
func (s *Server) isClosed() bool {
|
2021-06-29 10:46:13 +08:00
|
|
|
return atomic.LoadInt64(&s.isServing) != ServerStateHealthy
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetTimeTickChannel legacy API, returns time tick channel name
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
|
|
|
return &milvuspb.StringResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
},
|
|
|
|
Value: Params.TimeTickChannelName,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetStatisticsChannel legacy API, returns statistics channel name
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
|
|
|
return &milvuspb.StringResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
},
|
|
|
|
Value: Params.StatisticsChannelName,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// Flush notify segment to flush
|
|
|
|
// this api only guarantees all the segments requested is sealed
|
|
|
|
// these segments will be flushed only after the Flush policy is fulfilled
|
2021-06-23 16:56:11 +08:00
|
|
|
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
|
2021-07-28 11:43:22 +08:00
|
|
|
log.Debug("receive flush request", zap.Int64("dbID", req.GetDbID()), zap.Int64("collectionID", req.GetCollectionID()))
|
2021-09-22 19:33:54 +08:00
|
|
|
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "DataCoord-Flush")
|
|
|
|
defer sp.Finish()
|
2021-06-23 16:56:11 +08:00
|
|
|
resp := &datapb.FlushResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: "",
|
|
|
|
},
|
|
|
|
DbID: 0,
|
|
|
|
CollectionID: 0,
|
|
|
|
SegmentIDs: []int64{},
|
2021-05-28 09:55:21 +08:00
|
|
|
}
|
|
|
|
if s.isClosed() {
|
2021-06-23 16:56:11 +08:00
|
|
|
resp.Status.Reason = serverNotServingErrMsg
|
2021-05-28 09:55:21 +08:00
|
|
|
return resp, nil
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
2021-06-23 16:56:11 +08:00
|
|
|
sealedSegments, err := s.segmentManager.SealAllSegments(ctx, req.CollectionID)
|
|
|
|
if err != nil {
|
2021-07-28 11:43:22 +08:00
|
|
|
resp.Status.Reason = fmt.Sprintf("failed to flush %d, %s", req.CollectionID, err)
|
2021-05-28 09:55:21 +08:00
|
|
|
return resp, nil
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
2021-07-12 11:03:52 +08:00
|
|
|
log.Debug("flush response with segments", zap.Any("segments", sealedSegments))
|
2021-06-23 16:56:11 +08:00
|
|
|
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
resp.DbID = req.GetDbID()
|
|
|
|
resp.CollectionID = req.GetCollectionID()
|
|
|
|
resp.SegmentIDs = sealedSegments
|
|
|
|
return resp, nil
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// AssignSegmentID applies for segment ids and make allocation for records
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
|
2021-05-28 09:55:21 +08:00
|
|
|
if s.isClosed() {
|
2021-05-25 15:35:37 +08:00
|
|
|
return &datapb.AssignSegmentIDResponse{
|
|
|
|
Status: &commonpb.Status{
|
2021-06-22 18:24:08 +08:00
|
|
|
Reason: serverNotServingErrMsg,
|
2021-05-25 15:35:37 +08:00
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests))
|
|
|
|
|
|
|
|
for _, r := range req.SegmentIDRequests {
|
2021-07-28 11:43:22 +08:00
|
|
|
log.Debug("handle assign segment request",
|
2021-06-08 19:25:37 +08:00
|
|
|
zap.Int64("collectionID", r.GetCollectionID()),
|
|
|
|
zap.Int64("partitionID", r.GetPartitionID()),
|
|
|
|
zap.String("channelName", r.GetChannelName()),
|
|
|
|
zap.Uint32("count", r.GetCount()))
|
|
|
|
|
2021-07-07 14:02:01 +08:00
|
|
|
if coll := s.meta.GetCollection(r.CollectionID); coll == nil {
|
2021-06-21 17:28:03 +08:00
|
|
|
if err := s.loadCollectionFromRootCoord(ctx, r.CollectionID); err != nil {
|
|
|
|
log.Error("load collection from rootcoord error",
|
2021-05-25 15:35:37 +08:00
|
|
|
zap.Int64("collectionID", r.CollectionID),
|
|
|
|
zap.Error(err))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
2021-07-23 21:58:33 +08:00
|
|
|
|
2021-07-12 11:03:52 +08:00
|
|
|
s.cluster.Watch(r.ChannelName, r.CollectionID)
|
2021-05-29 10:47:29 +08:00
|
|
|
|
2021-07-23 21:58:33 +08:00
|
|
|
allocations, err := s.segmentManager.AllocSegment(ctx,
|
2021-05-25 15:35:37 +08:00
|
|
|
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
|
|
|
|
if err != nil {
|
2021-07-23 21:58:33 +08:00
|
|
|
log.Warn("failed to alloc segment", zap.Any("request", r), zap.Error(err))
|
2021-05-25 15:35:37 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2021-07-23 21:58:33 +08:00
|
|
|
log.Debug("Assign segment success", zap.Any("assignments", allocations))
|
2021-06-08 19:25:37 +08:00
|
|
|
|
2021-07-23 21:58:33 +08:00
|
|
|
for _, allocation := range allocations {
|
|
|
|
result := &datapb.SegmentIDAssignment{
|
|
|
|
SegID: allocation.SegmentID,
|
|
|
|
ChannelName: r.ChannelName,
|
|
|
|
Count: uint32(allocation.NumOfRows),
|
|
|
|
CollectionID: r.CollectionID,
|
|
|
|
PartitionID: r.PartitionID,
|
|
|
|
ExpireTime: allocation.ExpireTime,
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
Reason: "",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
assigns = append(assigns, result)
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return &datapb.AssignSegmentIDResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
},
|
|
|
|
SegIDAssignments: assigns,
|
|
|
|
}, nil
|
|
|
|
}
|
2021-05-26 19:06:56 +08:00
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetSegmentStates returns segments state
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
|
|
|
|
resp := &datapb.GetSegmentStatesResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}
|
2021-05-28 09:55:21 +08:00
|
|
|
if s.isClosed() {
|
2021-06-22 18:24:08 +08:00
|
|
|
resp.Status.Reason = serverNotServingErrMsg
|
2021-05-25 15:35:37 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, segmentID := range req.SegmentIDs {
|
|
|
|
state := &datapb.SegmentStateInfo{
|
|
|
|
Status: &commonpb.Status{},
|
|
|
|
SegmentID: segmentID,
|
|
|
|
}
|
2021-07-07 14:02:01 +08:00
|
|
|
segmentInfo := s.meta.GetSegment(segmentID)
|
|
|
|
if segmentInfo == nil {
|
2021-05-25 15:35:37 +08:00
|
|
|
state.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
2021-07-28 11:43:22 +08:00
|
|
|
state.Status.Reason = fmt.Sprintf("failed to get segment %d", segmentID)
|
2021-05-25 15:35:37 +08:00
|
|
|
} else {
|
|
|
|
state.Status.ErrorCode = commonpb.ErrorCode_Success
|
2021-06-15 11:06:42 +08:00
|
|
|
state.State = segmentInfo.GetState()
|
|
|
|
state.StartPosition = segmentInfo.GetStartPosition()
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
|
|
|
resp.States = append(resp.States, state)
|
|
|
|
}
|
|
|
|
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetInsertBinlogPaths returns binlog paths info for requested segments
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
|
|
|
|
resp := &datapb.GetInsertBinlogPathsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}
|
2021-06-09 15:02:48 +08:00
|
|
|
if s.isClosed() {
|
2021-06-22 18:24:08 +08:00
|
|
|
resp.Status.Reason = serverNotServingErrMsg
|
2021-06-09 15:02:48 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
2021-08-19 13:00:12 +08:00
|
|
|
segment := s.meta.GetSegment(req.GetSegmentID())
|
|
|
|
if segment == nil {
|
|
|
|
resp.Status.Reason = "segment not found"
|
2021-05-25 15:35:37 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
2021-08-19 13:00:12 +08:00
|
|
|
binlogs := segment.GetBinlogs()
|
|
|
|
fids := make([]UniqueID, 0, len(binlogs))
|
|
|
|
paths := make([]*internalpb.StringList, 0, len(binlogs))
|
|
|
|
for _, field := range binlogs {
|
|
|
|
fids = append(fids, field.GetFieldID())
|
|
|
|
paths = append(paths, &internalpb.StringList{Values: field.GetBinlogs()})
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
|
|
|
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
resp.FieldIDs = fids
|
|
|
|
resp.Paths = paths
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetCollectionStatistics returns statistics for collection
|
|
|
|
// for now only row count is returned
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
|
|
|
|
resp := &datapb.GetCollectionStatisticsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}
|
2021-06-09 15:02:48 +08:00
|
|
|
if s.isClosed() {
|
2021-06-22 18:24:08 +08:00
|
|
|
resp.Status.Reason = serverNotServingErrMsg
|
2021-06-09 15:02:48 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
2021-07-07 14:02:01 +08:00
|
|
|
nums := s.meta.GetNumRowsOfCollection(req.CollectionID)
|
2021-05-25 15:35:37 +08:00
|
|
|
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)})
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetPartitionStatistics return statistics for parition
|
|
|
|
// for now only row count is returned
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
|
|
|
|
resp := &datapb.GetPartitionStatisticsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}
|
2021-06-09 15:02:48 +08:00
|
|
|
if s.isClosed() {
|
2021-06-22 18:24:08 +08:00
|
|
|
resp.Status.Reason = serverNotServingErrMsg
|
2021-06-09 15:02:48 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
2021-07-07 14:02:01 +08:00
|
|
|
nums := s.meta.GetNumRowsOfPartition(req.CollectionID, req.PartitionID)
|
2021-05-25 15:35:37 +08:00
|
|
|
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)})
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetSegmentInfoChannel legacy API, returns segment info statistics channel
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
|
|
|
return &milvuspb.StringResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
},
|
|
|
|
Value: Params.SegmentInfoChannelName,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetSegmentInfo returns segment info requested, status, row count, etc included
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
|
|
|
|
resp := &datapb.GetSegmentInfoResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}
|
2021-05-28 09:55:21 +08:00
|
|
|
if s.isClosed() {
|
2021-06-22 18:24:08 +08:00
|
|
|
resp.Status.Reason = serverNotServingErrMsg
|
2021-05-25 15:35:37 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
infos := make([]*datapb.SegmentInfo, 0, len(req.SegmentIDs))
|
|
|
|
for _, id := range req.SegmentIDs {
|
2021-07-07 14:02:01 +08:00
|
|
|
info := s.meta.GetSegment(id)
|
|
|
|
if info == nil {
|
2021-07-28 11:43:22 +08:00
|
|
|
resp.Status.Reason = fmt.Sprintf("failed to get segment %d", id)
|
2021-05-25 15:35:37 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
2021-07-12 17:24:25 +08:00
|
|
|
infos = append(infos, info.SegmentInfo)
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
|
|
|
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
resp.Infos = infos
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// SaveBinlogPaths update segment related binlog path
|
|
|
|
// works for Checkpoints and Flush
|
2021-05-25 15:35:37 +08:00
|
|
|
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
|
|
|
|
resp := &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
}
|
2021-05-28 09:55:21 +08:00
|
|
|
if s.isClosed() {
|
2021-06-22 18:24:08 +08:00
|
|
|
resp.Reason = serverNotServingErrMsg
|
2021-05-25 15:35:37 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
2021-07-28 11:43:22 +08:00
|
|
|
log.Debug("receive SaveBinlogPaths request",
|
2021-06-08 19:25:37 +08:00
|
|
|
zap.Int64("collectionID", req.GetCollectionID()),
|
2021-06-18 16:02:05 +08:00
|
|
|
zap.Int64("segmentID", req.GetSegmentID()),
|
|
|
|
zap.Any("checkpoints", req.GetCheckPoints()))
|
2021-05-25 15:35:37 +08:00
|
|
|
|
2021-06-04 11:45:45 +08:00
|
|
|
// set segment to SegmentState_Flushing and save binlogs and checkpoints
|
2021-08-19 13:00:12 +08:00
|
|
|
err := s.meta.UpdateFlushSegmentsInfo(req.GetSegmentID(), req.GetFlushed(),
|
|
|
|
req.GetField2BinlogPaths(), req.GetCheckPoints(), req.GetStartPositions())
|
2021-05-25 15:35:37 +08:00
|
|
|
if err != nil {
|
2021-07-28 11:43:22 +08:00
|
|
|
log.Error("save binlog and checkpoints failed",
|
2021-06-08 19:25:37 +08:00
|
|
|
zap.Int64("segmentID", req.GetSegmentID()),
|
|
|
|
zap.Error(err))
|
2021-05-25 15:35:37 +08:00
|
|
|
resp.Reason = err.Error()
|
2021-05-27 18:45:24 +08:00
|
|
|
return resp, nil
|
2021-05-25 15:35:37 +08:00
|
|
|
}
|
2021-07-28 11:43:22 +08:00
|
|
|
log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID),
|
2021-08-19 13:00:12 +08:00
|
|
|
zap.Any("meta", req.GetField2BinlogPaths()))
|
2021-05-25 15:35:37 +08:00
|
|
|
|
2021-06-04 11:45:45 +08:00
|
|
|
if req.Flushed {
|
|
|
|
s.segmentManager.DropSegment(ctx, req.SegmentID)
|
|
|
|
s.flushCh <- req.SegmentID
|
|
|
|
}
|
2021-05-25 15:35:37 +08:00
|
|
|
resp.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
return resp, nil
|
|
|
|
}
|
2021-05-28 09:55:21 +08:00
|
|
|
|
2021-06-02 15:11:17 +08:00
|
|
|
// todo deprecated rpc
|
2021-05-28 09:55:21 +08:00
|
|
|
func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
2021-06-02 15:11:17 +08:00
|
|
|
resp := &internalpb.ComponentStates{
|
|
|
|
State: &internalpb.ComponentInfo{
|
|
|
|
NodeID: Params.NodeID,
|
2021-06-21 18:22:13 +08:00
|
|
|
Role: "datacoord",
|
2021-06-02 15:11:17 +08:00
|
|
|
StateCode: 0,
|
|
|
|
},
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
|
|
Reason: "",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
state := atomic.LoadInt64(&s.isServing)
|
|
|
|
switch state {
|
2021-06-29 10:46:13 +08:00
|
|
|
case ServerStateInitializing:
|
2021-06-02 15:11:17 +08:00
|
|
|
resp.State.StateCode = internalpb.StateCode_Initializing
|
2021-06-29 10:46:13 +08:00
|
|
|
case ServerStateHealthy:
|
2021-06-02 15:11:17 +08:00
|
|
|
resp.State.StateCode = internalpb.StateCode_Healthy
|
|
|
|
default:
|
|
|
|
resp.State.StateCode = internalpb.StateCode_Abnormal
|
|
|
|
}
|
|
|
|
return resp, nil
|
2021-05-28 09:55:21 +08:00
|
|
|
}
|
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetRecoveryInfo get recovery info for segment
|
2021-06-07 09:47:36 +08:00
|
|
|
func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
|
|
|
|
collectionID := req.GetCollectionID()
|
|
|
|
partitionID := req.GetPartitionID()
|
2021-07-28 11:43:22 +08:00
|
|
|
log.Info("receive get recovery info request",
|
2021-06-08 19:25:37 +08:00
|
|
|
zap.Int64("collectionID", collectionID),
|
2021-06-07 09:47:36 +08:00
|
|
|
zap.Int64("partitionID", partitionID))
|
|
|
|
resp := &datapb.GetRecoveryInfoResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}
|
2021-06-09 15:02:48 +08:00
|
|
|
if s.isClosed() {
|
2021-06-22 18:24:08 +08:00
|
|
|
resp.Status.Reason = serverNotServingErrMsg
|
2021-06-09 15:02:48 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
2021-06-07 09:47:36 +08:00
|
|
|
segmentIDs := s.meta.GetSegmentsOfPartition(collectionID, partitionID)
|
|
|
|
segment2Binlogs := make(map[UniqueID][]*datapb.FieldBinlog)
|
2021-09-07 11:35:18 +08:00
|
|
|
segmentsNumOfRows := make(map[UniqueID]int64)
|
2021-06-07 09:47:36 +08:00
|
|
|
for _, id := range segmentIDs {
|
2021-07-07 14:02:01 +08:00
|
|
|
segment := s.meta.GetSegment(id)
|
|
|
|
if segment == nil {
|
2021-07-28 11:43:22 +08:00
|
|
|
errMsg := fmt.Sprintf("failed to get segment %d", id)
|
2021-07-07 14:02:01 +08:00
|
|
|
log.Error(errMsg)
|
|
|
|
resp.Status.Reason = errMsg
|
2021-06-15 19:23:55 +08:00
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2021-08-19 13:00:12 +08:00
|
|
|
binlogs := segment.GetBinlogs()
|
2021-06-07 09:47:36 +08:00
|
|
|
field2Binlog := make(map[UniqueID][]string)
|
2021-08-19 13:00:12 +08:00
|
|
|
for _, field := range binlogs {
|
|
|
|
field2Binlog[field.GetFieldID()] = append(field2Binlog[field.GetFieldID()], field.GetBinlogs()...)
|
2021-06-07 09:47:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
for f, paths := range field2Binlog {
|
|
|
|
fieldBinlogs := &datapb.FieldBinlog{
|
|
|
|
FieldID: f,
|
|
|
|
Binlogs: paths,
|
|
|
|
}
|
|
|
|
segment2Binlogs[id] = append(segment2Binlogs[id], fieldBinlogs)
|
|
|
|
}
|
2021-09-07 11:35:18 +08:00
|
|
|
|
|
|
|
segmentsNumOfRows[id] = segment.NumOfRows
|
2021-06-07 09:47:36 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
binlogs := make([]*datapb.SegmentBinlogs, 0, len(segment2Binlogs))
|
|
|
|
for segmentID, fieldBinlogs := range segment2Binlogs {
|
|
|
|
sbl := &datapb.SegmentBinlogs{
|
|
|
|
SegmentID: segmentID,
|
2021-09-07 11:35:18 +08:00
|
|
|
NumOfRows: segmentsNumOfRows[segmentID],
|
2021-06-07 09:47:36 +08:00
|
|
|
FieldBinlogs: fieldBinlogs,
|
|
|
|
}
|
|
|
|
binlogs = append(binlogs, sbl)
|
|
|
|
}
|
|
|
|
|
2021-06-21 17:28:03 +08:00
|
|
|
dresp, err := s.rootCoordClient.DescribeCollection(s.ctx, &milvuspb.DescribeCollectionRequest{
|
2021-06-07 09:47:36 +08:00
|
|
|
Base: &commonpb.MsgBase{
|
|
|
|
MsgType: commonpb.MsgType_DescribeCollection,
|
|
|
|
SourceID: Params.NodeID,
|
|
|
|
},
|
|
|
|
CollectionID: collectionID,
|
|
|
|
})
|
|
|
|
if err = VerifyResponse(dresp, err); err != nil {
|
2021-07-28 11:43:22 +08:00
|
|
|
log.Error("get collection info from master failed",
|
2021-06-07 09:47:36 +08:00
|
|
|
zap.Int64("collectionID", collectionID),
|
|
|
|
zap.Error(err))
|
|
|
|
|
|
|
|
resp.Status.Reason = err.Error()
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
channels := dresp.GetVirtualChannelNames()
|
|
|
|
vchans := make([]vchannel, 0, len(channels))
|
|
|
|
for _, c := range channels {
|
|
|
|
vchans = append(vchans, vchannel{
|
|
|
|
CollectionID: collectionID,
|
|
|
|
DmlChannel: c,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:23:55 +08:00
|
|
|
channelInfos, err := s.GetVChanPositions(vchans, false)
|
2021-06-07 09:47:36 +08:00
|
|
|
if err != nil {
|
2021-07-28 11:43:22 +08:00
|
|
|
log.Error("get channel positions failed",
|
2021-06-08 19:25:37 +08:00
|
|
|
zap.Strings("channels", channels),
|
2021-06-07 09:47:36 +08:00
|
|
|
zap.Error(err))
|
|
|
|
resp.Status.Reason = err.Error()
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
resp.Binlogs = binlogs
|
|
|
|
resp.Channels = channelInfos
|
|
|
|
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
return resp, nil
|
|
|
|
}
|
2021-07-02 11:16:20 +08:00
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetFlushedSegments returns all segment matches provided criterion and in State Flushed
|
|
|
|
// If requested partition id < 0, ignores the partition id filter
|
2021-07-02 11:16:20 +08:00
|
|
|
func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
|
2021-09-06 11:12:42 +08:00
|
|
|
resp := &datapb.GetFlushedSegmentsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
},
|
|
|
|
}
|
2021-07-02 11:16:20 +08:00
|
|
|
collectionID := req.GetCollectionID()
|
|
|
|
partitionID := req.GetPartitionID()
|
2021-09-06 11:12:42 +08:00
|
|
|
log.Debug("GetFlushedSegment",
|
|
|
|
zap.Int64("collectionID", collectionID),
|
|
|
|
zap.Int64("partitionID", partitionID))
|
|
|
|
if s.isClosed() {
|
|
|
|
resp.Status.Reason = serverNotServingErrMsg
|
|
|
|
return resp, nil
|
|
|
|
}
|
2021-07-02 11:16:20 +08:00
|
|
|
var segmentIDs []UniqueID
|
|
|
|
if partitionID < 0 {
|
|
|
|
segmentIDs = s.meta.GetSegmentsOfCollection(collectionID)
|
|
|
|
} else {
|
|
|
|
segmentIDs = s.meta.GetSegmentsOfPartition(collectionID, partitionID)
|
|
|
|
}
|
|
|
|
ret := make([]UniqueID, 0, len(segmentIDs))
|
|
|
|
for _, id := range segmentIDs {
|
2021-07-07 14:02:01 +08:00
|
|
|
s := s.meta.GetSegment(id)
|
|
|
|
if s == nil || s.GetState() != commonpb.SegmentState_Flushed {
|
2021-07-02 11:16:20 +08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
ret = append(ret, id)
|
|
|
|
}
|
2021-09-06 11:12:42 +08:00
|
|
|
resp.Segments = ret
|
|
|
|
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
|
|
|
return resp, nil
|
2021-07-02 11:16:20 +08:00
|
|
|
}
|
2021-09-01 10:13:15 +08:00
|
|
|
|
2021-09-06 11:12:42 +08:00
|
|
|
// GetMetrics returns DataCoord metrics info
|
|
|
|
// it may include SystemMetrics, Topology metrics, etc.
|
2021-09-01 10:13:15 +08:00
|
|
|
func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
|
|
|
log.Debug("DataCoord.GetMetrics",
|
|
|
|
zap.Int64("node_id", Params.NodeID),
|
|
|
|
zap.String("req", req.Request))
|
|
|
|
|
|
|
|
if s.isClosed() {
|
|
|
|
log.Warn("DataCoord.GetMetrics failed",
|
|
|
|
zap.Int64("node_id", Params.NodeID),
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
zap.Error(errDataCoordIsUnhealthy(Params.NodeID)))
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: msgDataCoordIsUnhealthy(Params.NodeID),
|
|
|
|
},
|
|
|
|
Response: "",
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
metricType, err := metricsinfo.ParseMetricType(req.Request)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("DataCoord.GetMetrics failed to parse metric type",
|
|
|
|
zap.Int64("node_id", Params.NodeID),
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
zap.Error(err))
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: err.Error(),
|
|
|
|
},
|
|
|
|
Response: "",
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug("DataCoord.GetMetrics",
|
|
|
|
zap.String("metric_type", metricType))
|
|
|
|
|
|
|
|
if metricType == metricsinfo.SystemInfoMetrics {
|
2021-09-03 17:15:26 +08:00
|
|
|
ret, err := s.metricsCacheManager.GetSystemInfoMetrics()
|
|
|
|
if err == nil && ret != nil {
|
|
|
|
return ret, nil
|
|
|
|
}
|
|
|
|
log.Debug("failed to get system info metrics from cache, recompute instead",
|
|
|
|
zap.Error(err))
|
|
|
|
|
2021-09-01 10:13:15 +08:00
|
|
|
metrics, err := s.getSystemInfoMetrics(ctx, req)
|
|
|
|
|
|
|
|
log.Debug("DataCoord.GetMetrics",
|
|
|
|
zap.Int64("node_id", Params.NodeID),
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
zap.String("metric_type", metricType),
|
|
|
|
zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
|
|
|
|
zap.Error(err))
|
|
|
|
|
2021-09-03 17:15:26 +08:00
|
|
|
s.metricsCacheManager.UpdateSystemInfoMetrics(metrics)
|
|
|
|
|
2021-09-01 10:13:15 +08:00
|
|
|
return metrics, err
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug("DataCoord.GetMetrics failed, request metric type is not implemented yet",
|
|
|
|
zap.Int64("node_id", Params.NodeID),
|
|
|
|
zap.String("req", req.Request),
|
|
|
|
zap.String("metric_type", metricType))
|
|
|
|
|
|
|
|
return &milvuspb.GetMetricsResponse{
|
|
|
|
Status: &commonpb.Status{
|
|
|
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
|
|
|
Reason: metricsinfo.MsgUnimplementedMetric,
|
|
|
|
},
|
|
|
|
Response: "",
|
|
|
|
}, nil
|
|
|
|
}
|