2021-10-25 19:46:28 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-06-22 10:42:07 +08:00
package datacoord
2021-05-25 15:35:37 +08:00
import (
"context"
"fmt"
2022-04-01 11:33:28 +08:00
"math/rand"
2021-05-25 15:35:37 +08:00
"strconv"
2022-10-18 13:39:26 +08:00
"sync"
2023-01-04 19:37:36 +08:00
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
2021-05-25 15:35:37 +08:00
2022-10-16 20:49:27 +08:00
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
2021-11-19 13:57:12 +08:00
"github.com/milvus-io/milvus/internal/common"
2021-05-25 15:35:37 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
2022-12-01 16:31:16 +08:00
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/errorutil"
2022-02-09 18:55:46 +08:00
"github.com/milvus-io/milvus/internal/util/logutil"
2021-09-01 10:13:15 +08:00
"github.com/milvus-io/milvus/internal/util/metricsinfo"
2022-12-01 16:31:16 +08:00
"github.com/milvus-io/milvus/internal/util/paramtable"
2023-01-04 19:37:36 +08:00
"github.com/milvus-io/milvus/internal/util/segmentutil"
2021-12-21 19:13:13 +08:00
"github.com/milvus-io/milvus/internal/util/trace"
2022-09-09 09:58:37 +08:00
"github.com/milvus-io/milvus/internal/util/tsoutil"
2022-08-09 21:12:42 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-05-25 15:35:37 +08:00
)
2021-09-06 11:12:42 +08:00
// checks whether server in Healthy State
2021-05-28 09:55:21 +08:00
func ( s * Server ) isClosed ( ) bool {
2022-10-10 15:55:22 +08:00
return s . stateCode . Load ( ) != commonpb . StateCode_Healthy
2021-05-25 15:35:37 +08:00
}
2021-09-06 11:12:42 +08:00
// GetTimeTickChannel legacy API, returns time tick channel name
2021-05-25 15:35:37 +08:00
func ( s * Server ) GetTimeTickChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
return & milvuspb . StringResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} ,
2022-12-07 18:01:19 +08:00
Value : Params . CommonCfg . DataCoordTimeTick . GetValue ( ) ,
2021-05-25 15:35:37 +08:00
} , nil
}
2021-09-06 11:12:42 +08:00
// GetStatisticsChannel legacy API, returns statistics channel name
2021-05-25 15:35:37 +08:00
func ( s * Server ) GetStatisticsChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
return & milvuspb . StringResponse {
Status : & commonpb . Status {
2021-12-15 10:53:16 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : "no statistics channel" ,
2021-05-25 15:35:37 +08:00
} ,
} , nil
}
2021-09-06 11:12:42 +08:00
// Flush notify segment to flush
// this api only guarantees all the segments requested is sealed
// these segments will be flushed only after the Flush policy is fulfilled
2021-06-23 16:56:11 +08:00
func ( s * Server ) Flush ( ctx context . Context , req * datapb . FlushRequest ) ( * datapb . FlushResponse , error ) {
2022-03-02 15:35:55 +08:00
log . Info ( "receive flush request" , zap . Int64 ( "dbID" , req . GetDbID ( ) ) , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) )
2021-09-22 19:33:54 +08:00
sp , ctx := trace . StartSpanFromContextWithOperationName ( ctx , "DataCoord-Flush" )
defer sp . Finish ( )
2021-06-23 16:56:11 +08:00
resp := & datapb . FlushResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : "" ,
} ,
DbID : 0 ,
CollectionID : 0 ,
SegmentIDs : [ ] int64 { } ,
2021-05-28 09:55:21 +08:00
}
if s . isClosed ( ) {
2021-06-23 16:56:11 +08:00
resp . Status . Reason = serverNotServingErrMsg
2021-05-28 09:55:21 +08:00
return resp , nil
2021-05-25 15:35:37 +08:00
}
2022-09-09 09:58:37 +08:00
// generate a timestamp timeOfSeal, all data before timeOfSeal is guaranteed to be sealed or flushed
ts , err := s . allocator . allocTimestamp ( ctx )
if err != nil {
log . Warn ( "unable to alloc timestamp" , zap . Error ( err ) )
}
timeOfSeal , _ := tsoutil . ParseTS ( ts )
sealedSegmentIDs , err := s . segmentManager . SealAllSegments ( ctx , req . GetCollectionID ( ) , req . GetSegmentIDs ( ) )
2021-06-23 16:56:11 +08:00
if err != nil {
2021-07-28 11:43:22 +08:00
resp . Status . Reason = fmt . Sprintf ( "failed to flush %d, %s" , req . CollectionID , err )
2021-05-28 09:55:21 +08:00
return resp , nil
2021-05-25 15:35:37 +08:00
}
2022-09-09 09:58:37 +08:00
sealedSegmentsIDDict := make ( map [ UniqueID ] bool )
for _ , sealedSegmentID := range sealedSegmentIDs {
sealedSegmentsIDDict [ sealedSegmentID ] = true
}
segments := s . meta . GetSegmentsOfCollection ( req . GetCollectionID ( ) )
flushSegmentIDs := make ( [ ] UniqueID , 0 , len ( segments ) )
for _ , segment := range segments {
if segment != nil &&
( segment . GetState ( ) == commonpb . SegmentState_Flushed ||
segment . GetState ( ) == commonpb . SegmentState_Flushing ) &&
! sealedSegmentsIDDict [ segment . GetID ( ) ] {
flushSegmentIDs = append ( flushSegmentIDs , segment . GetID ( ) )
}
}
2022-03-02 15:35:55 +08:00
log . Info ( "flush response with segments" ,
2021-09-30 18:12:25 +08:00
zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) ,
2022-09-09 09:58:37 +08:00
zap . Int64s ( "sealSegments" , sealedSegmentIDs ) ,
zap . Int64s ( "flushSegments" , flushSegmentIDs ) ,
2022-12-30 15:37:31 +08:00
zap . Time ( "timeOfSeal" , timeOfSeal ) )
2021-06-23 16:56:11 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . DbID = req . GetDbID ( )
resp . CollectionID = req . GetCollectionID ( )
2022-09-09 09:58:37 +08:00
resp . SegmentIDs = sealedSegmentIDs
resp . TimeOfSeal = timeOfSeal . Unix ( )
resp . FlushSegmentIDs = flushSegmentIDs
2021-06-23 16:56:11 +08:00
return resp , nil
2021-05-25 15:35:37 +08:00
}
2022-02-22 13:15:51 +08:00
// AssignSegmentID applies for segment ids and make allocation for records.
2021-05-25 15:35:37 +08:00
func ( s * Server ) AssignSegmentID ( ctx context . Context , req * datapb . AssignSegmentIDRequest ) ( * datapb . AssignSegmentIDResponse , error ) {
2021-05-28 09:55:21 +08:00
if s . isClosed ( ) {
2021-05-25 15:35:37 +08:00
return & datapb . AssignSegmentIDResponse {
Status : & commonpb . Status {
2021-06-22 18:24:08 +08:00
Reason : serverNotServingErrMsg ,
2021-05-25 15:35:37 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
} , nil
}
assigns := make ( [ ] * datapb . SegmentIDAssignment , 0 , len ( req . SegmentIDRequests ) )
for _ , r := range req . SegmentIDRequests {
2022-03-02 15:35:55 +08:00
log . Info ( "handle assign segment request" ,
2021-06-08 19:25:37 +08:00
zap . Int64 ( "collectionID" , r . GetCollectionID ( ) ) ,
zap . Int64 ( "partitionID" , r . GetPartitionID ( ) ) ,
zap . String ( "channelName" , r . GetChannelName ( ) ) ,
2022-06-07 16:56:07 +08:00
zap . Uint32 ( "count" , r . GetCount ( ) ) ,
2022-06-27 13:56:17 +08:00
zap . Bool ( "isImport" , r . GetIsImport ( ) ) ,
zap . Int64 ( "import task ID" , r . GetImportTaskID ( ) ) )
2021-06-08 19:25:37 +08:00
2022-02-22 13:15:51 +08:00
// Load the collection info from Root Coordinator, if it is not found in server meta.
2023-01-04 16:37:35 +08:00
// Note: this request wouldn't be received if collection didn't exist.
2022-10-11 21:39:24 +08:00
_ , err := s . handler . GetCollection ( ctx , r . GetCollectionID ( ) )
if err != nil {
log . Warn ( "cannot get collection schema" , zap . Error ( err ) )
2021-11-18 22:31:34 +08:00
}
2022-02-22 13:15:51 +08:00
// Add the channel to cluster for watching.
2021-07-12 11:03:52 +08:00
s . cluster . Watch ( r . ChannelName , r . CollectionID )
2021-05-29 10:47:29 +08:00
2022-06-07 16:56:07 +08:00
segmentAllocations := make ( [ ] * Allocation , 0 )
if r . GetIsImport ( ) {
// Have segment manager allocate and return the segment allocation info.
2022-06-27 13:56:17 +08:00
segAlloc , err := s . segmentManager . allocSegmentForImport ( ctx ,
r . GetCollectionID ( ) , r . GetPartitionID ( ) , r . GetChannelName ( ) , int64 ( r . GetCount ( ) ) , r . GetImportTaskID ( ) )
2022-06-07 16:56:07 +08:00
if err != nil {
log . Warn ( "failed to alloc segment for import" , zap . Any ( "request" , r ) , zap . Error ( err ) )
continue
}
segmentAllocations = append ( segmentAllocations , segAlloc )
} else {
// Have segment manager allocate and return the segment allocation info.
segAlloc , err := s . segmentManager . AllocSegment ( ctx ,
r . CollectionID , r . PartitionID , r . ChannelName , int64 ( r . Count ) )
if err != nil {
log . Warn ( "failed to alloc segment" , zap . Any ( "request" , r ) , zap . Error ( err ) )
continue
}
segmentAllocations = append ( segmentAllocations , segAlloc ... )
2021-05-25 15:35:37 +08:00
}
2022-06-07 16:56:07 +08:00
log . Info ( "success to assign segments" , zap . Int64 ( "collectionID" , r . GetCollectionID ( ) ) , zap . Any ( "assignments" , segmentAllocations ) )
for _ , allocation := range segmentAllocations {
2021-07-23 21:58:33 +08:00
result := & datapb . SegmentIDAssignment {
SegID : allocation . SegmentID ,
ChannelName : r . ChannelName ,
Count : uint32 ( allocation . NumOfRows ) ,
CollectionID : r . CollectionID ,
PartitionID : r . PartitionID ,
ExpireTime : allocation . ExpireTime ,
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
}
assigns = append ( assigns , result )
2021-05-25 15:35:37 +08:00
}
}
return & datapb . AssignSegmentIDResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} ,
SegIDAssignments : assigns ,
} , nil
}
2021-05-26 19:06:56 +08:00
2021-09-06 11:12:42 +08:00
// GetSegmentStates returns segments state
2021-05-25 15:35:37 +08:00
func ( s * Server ) GetSegmentStates ( ctx context . Context , req * datapb . GetSegmentStatesRequest ) ( * datapb . GetSegmentStatesResponse , error ) {
resp := & datapb . GetSegmentStatesResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-05-28 09:55:21 +08:00
if s . isClosed ( ) {
2021-06-22 18:24:08 +08:00
resp . Status . Reason = serverNotServingErrMsg
2021-05-25 15:35:37 +08:00
return resp , nil
}
for _ , segmentID := range req . SegmentIDs {
state := & datapb . SegmentStateInfo {
SegmentID : segmentID ,
}
2021-07-07 14:02:01 +08:00
segmentInfo := s . meta . GetSegment ( segmentID )
if segmentInfo == nil {
2022-01-13 21:49:35 +08:00
state . State = commonpb . SegmentState_NotExist
2021-05-25 15:35:37 +08:00
} else {
2021-06-15 11:06:42 +08:00
state . State = segmentInfo . GetState ( )
state . StartPosition = segmentInfo . GetStartPosition ( )
2021-05-25 15:35:37 +08:00
}
resp . States = append ( resp . States , state )
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-09-06 11:12:42 +08:00
// GetInsertBinlogPaths returns binlog paths info for requested segments
2021-05-25 15:35:37 +08:00
func ( s * Server ) GetInsertBinlogPaths ( ctx context . Context , req * datapb . GetInsertBinlogPathsRequest ) ( * datapb . GetInsertBinlogPathsResponse , error ) {
resp := & datapb . GetInsertBinlogPathsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-06-09 15:02:48 +08:00
if s . isClosed ( ) {
2021-06-22 18:24:08 +08:00
resp . Status . Reason = serverNotServingErrMsg
2021-06-09 15:02:48 +08:00
return resp , nil
}
2021-08-19 13:00:12 +08:00
segment := s . meta . GetSegment ( req . GetSegmentID ( ) )
if segment == nil {
resp . Status . Reason = "segment not found"
2021-05-25 15:35:37 +08:00
return resp , nil
}
2021-08-19 13:00:12 +08:00
binlogs := segment . GetBinlogs ( )
fids := make ( [ ] UniqueID , 0 , len ( binlogs ) )
paths := make ( [ ] * internalpb . StringList , 0 , len ( binlogs ) )
for _ , field := range binlogs {
fids = append ( fids , field . GetFieldID ( ) )
2021-12-19 20:00:42 +08:00
binlogs := field . GetBinlogs ( )
p := make ( [ ] string , 0 , len ( binlogs ) )
for _ , log := range binlogs {
p = append ( p , log . GetLogPath ( ) )
}
paths = append ( paths , & internalpb . StringList { Values : p } )
2021-05-25 15:35:37 +08:00
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . FieldIDs = fids
resp . Paths = paths
return resp , nil
}
2021-09-06 11:12:42 +08:00
// GetCollectionStatistics returns statistics for collection
// for now only row count is returned
2021-05-25 15:35:37 +08:00
func ( s * Server ) GetCollectionStatistics ( ctx context . Context , req * datapb . GetCollectionStatisticsRequest ) ( * datapb . GetCollectionStatisticsResponse , error ) {
2021-12-21 09:34:53 +08:00
ctx = logutil . WithModule ( ctx , moduleName )
2022-10-25 19:31:30 +08:00
logutil . Logger ( ctx ) . Info ( "received request to get collection statistics" )
2021-05-25 15:35:37 +08:00
resp := & datapb . GetCollectionStatisticsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-06-09 15:02:48 +08:00
if s . isClosed ( ) {
2021-06-22 18:24:08 +08:00
resp . Status . Reason = serverNotServingErrMsg
2021-06-09 15:02:48 +08:00
return resp , nil
}
2021-07-07 14:02:01 +08:00
nums := s . meta . GetNumRowsOfCollection ( req . CollectionID )
2021-05-25 15:35:37 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . Stats = append ( resp . Stats , & commonpb . KeyValuePair { Key : "row_count" , Value : strconv . FormatInt ( nums , 10 ) } )
2022-10-25 19:31:30 +08:00
logutil . Logger ( ctx ) . Info ( "success to get collection statistics" , zap . Any ( "response" , resp ) )
2021-05-25 15:35:37 +08:00
return resp , nil
}
2022-07-18 09:58:28 +08:00
// GetPartitionStatistics returns statistics for partition
// if partID is empty, return statistics for all partitions of the collection
2021-09-06 11:12:42 +08:00
// for now only row count is returned
2021-05-25 15:35:37 +08:00
func ( s * Server ) GetPartitionStatistics ( ctx context . Context , req * datapb . GetPartitionStatisticsRequest ) ( * datapb . GetPartitionStatisticsResponse , error ) {
resp := & datapb . GetPartitionStatisticsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-06-09 15:02:48 +08:00
if s . isClosed ( ) {
2021-06-22 18:24:08 +08:00
resp . Status . Reason = serverNotServingErrMsg
2021-06-09 15:02:48 +08:00
return resp , nil
}
2022-07-18 09:58:28 +08:00
nums := int64 ( 0 )
if len ( req . GetPartitionIDs ( ) ) == 0 {
nums = s . meta . GetNumRowsOfCollection ( req . CollectionID )
}
for _ , partID := range req . GetPartitionIDs ( ) {
num := s . meta . GetNumRowsOfPartition ( req . CollectionID , partID )
nums += num
}
2021-05-25 15:35:37 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . Stats = append ( resp . Stats , & commonpb . KeyValuePair { Key : "row_count" , Value : strconv . FormatInt ( nums , 10 ) } )
2022-10-25 19:31:30 +08:00
logutil . Logger ( ctx ) . Info ( "success to get partition statistics" , zap . Any ( "response" , resp ) )
2021-05-25 15:35:37 +08:00
return resp , nil
}
2021-09-06 11:12:42 +08:00
// GetSegmentInfoChannel legacy API, returns segment info statistics channel
2021-05-25 15:35:37 +08:00
func ( s * Server ) GetSegmentInfoChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
return & milvuspb . StringResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} ,
2022-12-07 18:01:19 +08:00
Value : Params . CommonCfg . DataCoordSegmentInfo . GetValue ( ) ,
2021-05-25 15:35:37 +08:00
} , nil
}
2021-09-06 11:12:42 +08:00
// GetSegmentInfo returns segment info requested, status, row count, etc included
2022-12-01 16:31:16 +08:00
// Called by: QueryCoord, DataNode, IndexCoord, Proxy.
2021-05-25 15:35:37 +08:00
func ( s * Server ) GetSegmentInfo ( ctx context . Context , req * datapb . GetSegmentInfoRequest ) ( * datapb . GetSegmentInfoResponse , error ) {
resp := & datapb . GetSegmentInfoResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-05-28 09:55:21 +08:00
if s . isClosed ( ) {
2021-06-22 18:24:08 +08:00
resp . Status . Reason = serverNotServingErrMsg
2021-05-25 15:35:37 +08:00
return resp , nil
}
2022-12-01 16:31:16 +08:00
infos := make ( [ ] * datapb . SegmentInfo , 0 , len ( req . GetSegmentIDs ( ) ) )
2022-12-13 16:17:22 +08:00
channelCPs := make ( map [ string ] * internalpb . MsgPosition )
2021-05-25 15:35:37 +08:00
for _ , id := range req . SegmentIDs {
2022-06-17 18:24:12 +08:00
var info * SegmentInfo
if req . IncludeUnHealthy {
2022-10-15 15:13:24 +08:00
info = s . meta . GetSegmentUnsafe ( id )
2022-11-01 21:01:34 +08:00
2022-08-23 17:44:56 +08:00
if info == nil {
log . Warn ( "failed to get segment, this may have been cleaned" , zap . Int64 ( "segmentID" , id ) )
2022-09-09 15:52:35 +08:00
resp . Status . Reason = msgSegmentNotFound ( id )
2022-08-23 17:44:56 +08:00
return resp , nil
2022-06-17 18:24:12 +08:00
}
2022-11-01 21:01:34 +08:00
child := s . meta . GetCompactionTo ( id )
2022-12-01 16:31:16 +08:00
clonedInfo := info . Clone ( )
2022-11-01 21:01:34 +08:00
if child != nil {
2022-12-01 16:31:16 +08:00
clonedInfo . Deltalogs = append ( clonedInfo . Deltalogs , child . GetDeltalogs ( ) ... )
clonedInfo . DmlPosition = child . GetDmlPosition ( )
2022-11-01 21:01:34 +08:00
}
2022-12-01 16:31:16 +08:00
segmentutil . ReCalcRowCount ( info . SegmentInfo , clonedInfo . SegmentInfo )
infos = append ( infos , clonedInfo . SegmentInfo )
2022-06-17 18:24:12 +08:00
} else {
info = s . meta . GetSegment ( id )
if info == nil {
2022-09-09 15:52:35 +08:00
resp . Status . Reason = msgSegmentNotFound ( id )
2022-06-17 18:24:12 +08:00
return resp , nil
}
2022-12-01 16:31:16 +08:00
clonedInfo := info . Clone ( )
segmentutil . ReCalcRowCount ( info . SegmentInfo , clonedInfo . SegmentInfo )
infos = append ( infos , clonedInfo . SegmentInfo )
2021-05-25 15:35:37 +08:00
}
2022-12-13 16:17:22 +08:00
vchannel := info . InsertChannel
if _ , ok := channelCPs [ vchannel ] ; vchannel != "" && ! ok {
channelCPs [ vchannel ] = s . meta . GetChannelCheckpoint ( vchannel )
}
2021-05-25 15:35:37 +08:00
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . Infos = infos
2022-12-13 16:17:22 +08:00
resp . ChannelCheckpoint = channelCPs
2021-05-25 15:35:37 +08:00
return resp , nil
}
2021-12-21 19:05:19 +08:00
// SaveBinlogPaths updates segment related binlog path
2021-09-06 11:12:42 +08:00
// works for Checkpoints and Flush
2021-05-25 15:35:37 +08:00
func ( s * Server ) SaveBinlogPaths ( ctx context . Context , req * datapb . SaveBinlogPathsRequest ) ( * commonpb . Status , error ) {
2021-11-12 00:22:42 +08:00
resp := & commonpb . Status { ErrorCode : commonpb . ErrorCode_UnexpectedError }
2021-10-14 15:44:34 +08:00
2021-05-28 09:55:21 +08:00
if s . isClosed ( ) {
2021-06-22 18:24:08 +08:00
resp . Reason = serverNotServingErrMsg
2021-05-25 15:35:37 +08:00
return resp , nil
}
2021-10-14 15:44:34 +08:00
2022-03-02 15:35:55 +08:00
log . Info ( "receive SaveBinlogPaths request" ,
2022-01-04 19:29:45 +08:00
zap . Int64 ( "nodeID" , req . GetBase ( ) . GetSourceID ( ) ) ,
2021-06-08 19:25:37 +08:00
zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) ,
2021-06-18 16:02:05 +08:00
zap . Int64 ( "segmentID" , req . GetSegmentID ( ) ) ,
2021-10-13 20:54:33 +08:00
zap . Bool ( "isFlush" , req . GetFlushed ( ) ) ,
2021-11-12 17:31:11 +08:00
zap . Bool ( "isDropped" , req . GetDropped ( ) ) ,
2021-11-17 17:57:11 +08:00
zap . Any ( "startPositions" , req . GetStartPositions ( ) ) ,
2021-06-18 16:02:05 +08:00
zap . Any ( "checkpoints" , req . GetCheckPoints ( ) ) )
2021-05-25 15:35:37 +08:00
2021-10-14 15:44:34 +08:00
// validate
nodeID := req . GetBase ( ) . GetSourceID ( )
segmentID := req . GetSegmentID ( )
segment := s . meta . GetSegment ( segmentID )
if segment == nil {
log . Error ( "failed to get segment" , zap . Int64 ( "segmentID" , segmentID ) )
2022-07-20 15:06:30 +08:00
failResponseWithCode ( resp , commonpb . ErrorCode_SegmentNotFound , fmt . Sprintf ( "failed to get segment %d" , segmentID ) )
2021-10-14 15:44:34 +08:00
return resp , nil
}
2022-06-02 18:54:04 +08:00
// No need to check import channel--node matching in data import case.
if ! req . GetImporting ( ) {
channel := segment . GetInsertChannel ( )
if ! s . channelManager . Match ( nodeID , channel ) {
2022-07-20 15:06:30 +08:00
failResponse ( resp , fmt . Sprintf ( "channel %s is not watched on node %d" , channel , nodeID ) )
2022-06-02 18:54:04 +08:00
resp . ErrorCode = commonpb . ErrorCode_MetaFailed
log . Warn ( "node is not matched with channel" , zap . String ( "channel" , channel ) , zap . Int64 ( "nodeID" , nodeID ) )
return resp , nil
}
2021-10-14 15:44:34 +08:00
}
2021-11-12 17:31:11 +08:00
if req . GetDropped ( ) {
s . segmentManager . DropSegment ( ctx , segment . GetID ( ) )
}
2022-04-25 11:07:47 +08:00
// Set segment to SegmentState_Flushing. Also save binlogs and checkpoints.
2021-11-12 00:22:42 +08:00
err := s . meta . UpdateFlushSegmentsInfo (
req . GetSegmentID ( ) ,
req . GetFlushed ( ) ,
req . GetDropped ( ) ,
2022-04-20 14:03:40 +08:00
req . GetImporting ( ) ,
2021-11-12 00:22:42 +08:00
req . GetField2BinlogPaths ( ) ,
req . GetField2StatslogPaths ( ) ,
req . GetDeltalogs ( ) ,
req . GetCheckPoints ( ) ,
req . GetStartPositions ( ) )
2021-05-25 15:35:37 +08:00
if err != nil {
2021-07-28 11:43:22 +08:00
log . Error ( "save binlog and checkpoints failed" ,
2021-06-08 19:25:37 +08:00
zap . Int64 ( "segmentID" , req . GetSegmentID ( ) ) ,
zap . Error ( err ) )
2021-05-25 15:35:37 +08:00
resp . Reason = err . Error ( )
2021-05-27 18:45:24 +08:00
return resp , nil
2021-05-25 15:35:37 +08:00
}
2021-10-14 15:44:34 +08:00
2022-04-25 11:07:47 +08:00
log . Info ( "flush segment with meta" , zap . Int64 ( "segment id" , req . SegmentID ) ,
2021-08-19 13:00:12 +08:00
zap . Any ( "meta" , req . GetField2BinlogPaths ( ) ) )
2021-05-25 15:35:37 +08:00
2021-11-12 00:22:42 +08:00
if req . GetFlushed ( ) {
2021-06-04 11:45:45 +08:00
s . segmentManager . DropSegment ( ctx , req . SegmentID )
s . flushCh <- req . SegmentID
2021-11-05 22:25:00 +08:00
2022-12-07 18:01:19 +08:00
if ! req . Importing && Params . DataCoordCfg . EnableCompaction . GetAsBool ( ) {
2022-10-10 20:31:22 +08:00
err = s . compactionTrigger . triggerSingleCompaction ( segment . GetCollectionID ( ) , segment . GetPartitionID ( ) ,
segmentID , segment . GetInsertChannel ( ) )
if err != nil {
log . Warn ( "failed to trigger single compaction" , zap . Int64 ( "segment ID" , segmentID ) )
2022-04-25 11:07:47 +08:00
} else {
2022-10-10 20:31:22 +08:00
log . Info ( "compaction triggered for segment" , zap . Int64 ( "segment ID" , segmentID ) )
2021-11-05 22:25:00 +08:00
}
}
2021-06-04 11:45:45 +08:00
}
2021-05-25 15:35:37 +08:00
resp . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-05-28 09:55:21 +08:00
2021-11-29 22:35:41 +08:00
// DropVirtualChannel notifies vchannel dropped
// And contains the remaining data log & checkpoint to update
func ( s * Server ) DropVirtualChannel ( ctx context . Context , req * datapb . DropVirtualChannelRequest ) ( * datapb . DropVirtualChannelResponse , error ) {
resp := & datapb . DropVirtualChannelResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
if s . isClosed ( ) {
resp . Status . Reason = serverNotServingErrMsg
return resp , nil
}
channel := req . GetChannelName ( )
2022-03-02 15:35:55 +08:00
log . Info ( "receive DropVirtualChannel request" ,
2021-11-29 22:35:41 +08:00
zap . String ( "channel name" , channel ) )
// validate
nodeID := req . GetBase ( ) . GetSourceID ( )
if ! s . channelManager . Match ( nodeID , channel ) {
2022-07-20 15:06:30 +08:00
failResponse ( resp . Status , fmt . Sprintf ( "channel %s is not watched on node %d" , channel , nodeID ) )
2022-05-27 16:20:00 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_MetaFailed
2021-11-29 22:35:41 +08:00
log . Warn ( "node is not matched with channel" , zap . String ( "channel" , channel ) , zap . Int64 ( "nodeID" , nodeID ) )
return resp , nil
}
segments := make ( [ ] * SegmentInfo , 0 , len ( req . GetSegments ( ) ) )
for _ , seg2Drop := range req . GetSegments ( ) {
info := & datapb . SegmentInfo {
ID : seg2Drop . GetSegmentID ( ) ,
CollectionID : seg2Drop . GetCollectionID ( ) ,
InsertChannel : channel ,
Binlogs : seg2Drop . GetField2BinlogPaths ( ) ,
Statslogs : seg2Drop . GetField2StatslogPaths ( ) ,
Deltalogs : seg2Drop . GetDeltalogs ( ) ,
StartPosition : seg2Drop . GetStartPosition ( ) ,
DmlPosition : seg2Drop . GetCheckPoint ( ) ,
NumOfRows : seg2Drop . GetNumOfRows ( ) ,
}
segment := NewSegmentInfo ( info )
segments = append ( segments , segment )
}
err := s . meta . UpdateDropChannelSegmentInfo ( channel , segments )
if err != nil {
log . Error ( "Update Drop Channel segment info failed" , zap . String ( "channel" , channel ) , zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
2022-03-02 15:35:55 +08:00
log . Info ( "DropVChannel plan to remove" , zap . String ( "channel" , channel ) )
2022-03-28 22:33:27 +08:00
err = s . channelManager . Release ( nodeID , channel )
2021-11-29 22:35:41 +08:00
if err != nil {
2022-03-28 22:33:27 +08:00
log . Warn ( "DropVChannel failed to ReleaseAndRemove" , zap . String ( "channel" , channel ) , zap . Error ( err ) )
2021-11-29 22:35:41 +08:00
}
s . segmentManager . DropSegmentsOfChannel ( ctx , channel )
// no compaction triggerred in Drop procedure
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2022-04-06 15:33:32 +08:00
// SetSegmentState reset the state of the given segment.
func ( s * Server ) SetSegmentState ( ctx context . Context , req * datapb . SetSegmentStateRequest ) ( * datapb . SetSegmentStateResponse , error ) {
if s . isClosed ( ) {
return & datapb . SetSegmentStateResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : serverNotServingErrMsg ,
} ,
} , nil
}
err := s . meta . SetState ( req . GetSegmentId ( ) , req . GetNewState ( ) )
if err != nil {
log . Error ( "failed to updated segment state in dataCoord meta" ,
zap . Int64 ( "segment ID" , req . SegmentId ) ,
zap . String ( "to state" , req . GetNewState ( ) . String ( ) ) )
return & datapb . SetSegmentStateResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : err . Error ( ) ,
} ,
} , nil
}
return & datapb . SetSegmentStateResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} ,
} , nil
}
2022-10-10 15:55:22 +08:00
func ( s * Server ) GetStateCode ( ) commonpb . StateCode {
code := s . stateCode . Load ( )
if code == nil {
return commonpb . StateCode_Abnormal
}
return code . ( commonpb . StateCode )
}
2021-09-24 21:33:55 +08:00
// GetComponentStates returns DataCoord's current state
2022-10-10 15:55:22 +08:00
func ( s * Server ) GetComponentStates ( ctx context . Context ) ( * milvuspb . ComponentStates , error ) {
2021-11-19 13:57:12 +08:00
nodeID := common . NotRegisteredID
if s . session != nil && s . session . Registered ( ) {
nodeID = s . session . ServerID // or Params.NodeID
}
2022-10-10 15:55:22 +08:00
code := s . GetStateCode ( )
resp := & milvuspb . ComponentStates {
State : & milvuspb . ComponentInfo {
2021-11-19 13:57:12 +08:00
// NodeID: Params.NodeID, // will race with Server.Register()
NodeID : nodeID ,
2021-06-21 18:22:13 +08:00
Role : "datacoord" ,
2022-10-10 15:55:22 +08:00
StateCode : code ,
2021-06-02 15:11:17 +08:00
} ,
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
}
return resp , nil
2021-05-28 09:55:21 +08:00
}
2022-12-01 16:31:16 +08:00
// GetRecoveryInfo get recovery info for segment.
// Called by: QueryCoord.
2021-06-07 09:47:36 +08:00
func ( s * Server ) GetRecoveryInfo ( ctx context . Context , req * datapb . GetRecoveryInfoRequest ) ( * datapb . GetRecoveryInfoResponse , error ) {
collectionID := req . GetCollectionID ( )
partitionID := req . GetPartitionID ( )
2022-09-16 11:32:48 +08:00
log := log . With (
2021-06-08 19:25:37 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2022-09-16 11:32:48 +08:00
zap . Int64 ( "partitionID" , partitionID ) ,
)
2022-09-20 19:16:49 +08:00
log . Info ( "get recovery info request received" )
2021-06-07 09:47:36 +08:00
resp := & datapb . GetRecoveryInfoResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-06-09 15:02:48 +08:00
if s . isClosed ( ) {
2021-06-22 18:24:08 +08:00
resp . Status . Reason = serverNotServingErrMsg
2021-06-09 15:02:48 +08:00
return resp , nil
}
2022-09-16 11:32:48 +08:00
2023-01-04 16:37:35 +08:00
dresp , err := s . rootCoordClient . DescribeCollectionInternal ( s . ctx , & milvuspb . DescribeCollectionRequest {
2022-10-21 15:57:28 +08:00
Base : commonpbutil . NewMsgBase (
commonpbutil . WithMsgType ( commonpb . MsgType_DescribeCollection ) ,
2022-11-04 14:25:38 +08:00
commonpbutil . WithSourceID ( paramtable . GetNodeID ( ) ) ,
2022-10-21 15:57:28 +08:00
) ,
2022-09-16 11:32:48 +08:00
CollectionID : collectionID ,
} )
if err = VerifyResponse ( dresp , err ) ; err != nil {
log . Error ( "get collection info from rootcoord failed" ,
zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
channels := dresp . GetVirtualChannelNames ( )
channelInfos := make ( [ ] * datapb . VchannelInfo , 0 , len ( channels ) )
flushedIDs := make ( typeutil . UniqueSet )
for _ , c := range channels {
2022-10-11 16:51:22 +08:00
channelInfo := s . handler . GetQueryVChanPositions ( & channel { Name : c , CollectionID : collectionID } , partitionID )
2022-09-16 11:32:48 +08:00
channelInfos = append ( channelInfos , channelInfo )
2022-10-25 19:31:30 +08:00
log . Info ( "datacoord append channelInfo in GetRecoveryInfo" ,
2022-09-16 11:32:48 +08:00
zap . Any ( "channelInfo" , channelInfo ) ,
)
flushedIDs . Insert ( channelInfo . GetFlushedSegmentIds ( ) ... )
}
2021-06-07 09:47:36 +08:00
segment2Binlogs := make ( map [ UniqueID ] [ ] * datapb . FieldBinlog )
2021-10-22 14:31:13 +08:00
segment2StatsBinlogs := make ( map [ UniqueID ] [ ] * datapb . FieldBinlog )
2021-12-19 20:00:42 +08:00
segment2DeltaBinlogs := make ( map [ UniqueID ] [ ] * datapb . FieldBinlog )
2022-04-20 16:15:41 +08:00
segment2InsertChannel := make ( map [ UniqueID ] string )
2021-09-07 11:35:18 +08:00
segmentsNumOfRows := make ( map [ UniqueID ] int64 )
2022-09-16 11:32:48 +08:00
for id := range flushedIDs {
segment := s . meta . GetSegmentUnsafe ( id )
2021-07-07 14:02:01 +08:00
if segment == nil {
2021-07-28 11:43:22 +08:00
errMsg := fmt . Sprintf ( "failed to get segment %d" , id )
2021-07-07 14:02:01 +08:00
log . Error ( errMsg )
resp . Status . Reason = errMsg
2021-06-15 19:23:55 +08:00
return resp , nil
}
2022-09-26 18:06:54 +08:00
// Skip non-flushing, non-flushed and dropped segments.
2022-09-16 11:32:48 +08:00
if segment . State != commonpb . SegmentState_Flushed && segment . State != commonpb . SegmentState_Flushing && segment . State != commonpb . SegmentState_Dropped {
2021-06-15 19:23:55 +08:00
continue
}
2022-11-11 18:03:05 +08:00
// Also skip bulk insert & fake segments.
if segment . GetIsImporting ( ) || segment . GetIsFake ( ) {
2022-09-26 18:06:54 +08:00
continue
}
2022-04-20 16:15:41 +08:00
segment2InsertChannel [ segment . ID ] = segment . InsertChannel
2021-12-09 11:03:08 +08:00
binlogs := segment . GetBinlogs ( )
if len ( binlogs ) == 0 {
2022-09-16 11:32:48 +08:00
flushedIDs . Remove ( id )
2021-12-09 11:03:08 +08:00
continue
}
2021-12-19 20:00:42 +08:00
field2Binlog := make ( map [ UniqueID ] [ ] * datapb . Binlog )
2021-08-19 13:00:12 +08:00
for _ , field := range binlogs {
field2Binlog [ field . GetFieldID ( ) ] = append ( field2Binlog [ field . GetFieldID ( ) ] , field . GetBinlogs ( ) ... )
2021-06-07 09:47:36 +08:00
}
for f , paths := range field2Binlog {
fieldBinlogs := & datapb . FieldBinlog {
FieldID : f ,
Binlogs : paths ,
}
segment2Binlogs [ id ] = append ( segment2Binlogs [ id ] , fieldBinlogs )
}
2021-09-07 11:35:18 +08:00
2022-12-01 16:31:16 +08:00
if newCount := segmentutil . CalcRowCountFromBinLog ( segment . SegmentInfo ) ; newCount != segment . NumOfRows {
log . Warn ( "segment row number meta inconsistent with bin log row count and will be corrected" ,
zap . Int64 ( "segment ID" , segment . GetID ( ) ) ,
zap . Int64 ( "segment meta row count (wrong)" , segment . GetNumOfRows ( ) ) ,
zap . Int64 ( "segment bin log row count (correct)" , newCount ) )
segmentsNumOfRows [ id ] = newCount
} else {
segmentsNumOfRows [ id ] = segment . NumOfRows
}
2021-10-22 14:31:13 +08:00
statsBinlogs := segment . GetStatslogs ( )
2021-12-19 20:00:42 +08:00
field2StatsBinlog := make ( map [ UniqueID ] [ ] * datapb . Binlog )
2021-10-22 14:31:13 +08:00
for _ , field := range statsBinlogs {
field2StatsBinlog [ field . GetFieldID ( ) ] = append ( field2StatsBinlog [ field . GetFieldID ( ) ] , field . GetBinlogs ( ) ... )
}
for f , paths := range field2StatsBinlog {
fieldBinlogs := & datapb . FieldBinlog {
FieldID : f ,
Binlogs : paths ,
}
segment2StatsBinlogs [ id ] = append ( segment2StatsBinlogs [ id ] , fieldBinlogs )
}
2021-12-19 20:00:42 +08:00
if len ( segment . GetDeltalogs ( ) ) > 0 {
segment2DeltaBinlogs [ id ] = append ( segment2DeltaBinlogs [ id ] , segment . GetDeltalogs ( ) ... )
}
2021-06-07 09:47:36 +08:00
}
binlogs := make ( [ ] * datapb . SegmentBinlogs , 0 , len ( segment2Binlogs ) )
2021-10-22 14:31:13 +08:00
for segmentID := range flushedIDs {
2021-06-07 09:47:36 +08:00
sbl := & datapb . SegmentBinlogs {
2022-04-20 16:15:41 +08:00
SegmentID : segmentID ,
NumOfRows : segmentsNumOfRows [ segmentID ] ,
FieldBinlogs : segment2Binlogs [ segmentID ] ,
Statslogs : segment2StatsBinlogs [ segmentID ] ,
Deltalogs : segment2DeltaBinlogs [ segmentID ] ,
InsertChannel : segment2InsertChannel [ segmentID ] ,
2021-06-07 09:47:36 +08:00
}
binlogs = append ( binlogs , sbl )
}
resp . Channels = channelInfos
2022-09-16 11:32:48 +08:00
resp . Binlogs = binlogs
2021-06-07 09:47:36 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-07-02 11:16:20 +08:00
2022-09-20 19:16:49 +08:00
// GetFlushedSegments returns all segment matches provided criterion and in state Flushed or Dropped (compacted but not GCed yet)
2021-09-06 11:12:42 +08:00
// If requested partition id < 0, ignores the partition id filter
2021-07-02 11:16:20 +08:00
func ( s * Server ) GetFlushedSegments ( ctx context . Context , req * datapb . GetFlushedSegmentsRequest ) ( * datapb . GetFlushedSegmentsResponse , error ) {
2021-09-06 11:12:42 +08:00
resp := & datapb . GetFlushedSegmentsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-07-02 11:16:20 +08:00
collectionID := req . GetCollectionID ( )
partitionID := req . GetPartitionID ( )
2022-10-25 19:31:30 +08:00
log . Info ( "received get flushed segments request" ,
2021-09-06 11:12:42 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
2022-09-16 11:32:48 +08:00
zap . Int64 ( "partitionID" , partitionID ) ,
)
2021-09-06 11:12:42 +08:00
if s . isClosed ( ) {
resp . Status . Reason = serverNotServingErrMsg
return resp , nil
}
2021-07-02 11:16:20 +08:00
var segmentIDs [ ] UniqueID
if partitionID < 0 {
2022-09-16 11:32:48 +08:00
segmentIDs = s . meta . GetSegmentsIDOfCollectionWithDropped ( collectionID )
2021-07-02 11:16:20 +08:00
} else {
2022-09-16 11:32:48 +08:00
segmentIDs = s . meta . GetSegmentsIDOfPartitionWithDropped ( collectionID , partitionID )
2021-07-02 11:16:20 +08:00
}
ret := make ( [ ] UniqueID , 0 , len ( segmentIDs ) )
for _ , id := range segmentIDs {
2022-11-15 19:55:07 +08:00
segment := s . meta . GetSegmentUnsafe ( id )
2022-11-14 14:37:07 +08:00
// if this segment == nil, we assume this segment has been gc
if segment == nil ||
( segment . GetState ( ) != commonpb . SegmentState_Dropped &&
2022-12-01 16:35:20 +08:00
segment . GetState ( ) != commonpb . SegmentState_Flushed &&
segment . GetState ( ) != commonpb . SegmentState_Flushing ) {
2022-11-14 14:37:07 +08:00
continue
}
if ! req . GetIncludeUnhealthy ( ) && segment . GetState ( ) == commonpb . SegmentState_Dropped {
2021-07-02 11:16:20 +08:00
continue
}
ret = append ( ret , id )
}
2022-05-31 16:36:03 +08:00
2021-09-06 11:12:42 +08:00
resp . Segments = ret
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
2021-07-02 11:16:20 +08:00
}
2021-09-01 10:13:15 +08:00
2022-10-08 11:51:02 +08:00
// GetSegmentsByStates returns all segment matches provided criterion and States
// If requested partition id < 0, ignores the partition id filter
func ( s * Server ) GetSegmentsByStates ( ctx context . Context , req * datapb . GetSegmentsByStatesRequest ) ( * datapb . GetSegmentsByStatesResponse , error ) {
resp := & datapb . GetSegmentsByStatesResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
collectionID := req . GetCollectionID ( )
partitionID := req . GetPartitionID ( )
states := req . GetStates ( )
2022-10-25 19:31:30 +08:00
log . Info ( "received get segments by states request" ,
2022-10-08 11:51:02 +08:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64 ( "partitionID" , partitionID ) ,
zap . Any ( "states" , states ) )
if s . isClosed ( ) {
resp . Status . Reason = serverNotServingErrMsg
return resp , nil
}
var segmentIDs [ ] UniqueID
if partitionID < 0 {
segmentIDs = s . meta . GetSegmentsIDOfCollection ( collectionID )
} else {
segmentIDs = s . meta . GetSegmentsIDOfPartition ( collectionID , partitionID )
}
ret := make ( [ ] UniqueID , 0 , len ( segmentIDs ) )
statesDict := make ( map [ commonpb . SegmentState ] bool )
for _ , state := range states {
statesDict [ state ] = true
}
for _ , id := range segmentIDs {
segment := s . meta . GetSegment ( id )
if segment != nil && statesDict [ segment . GetState ( ) ] {
ret = append ( ret , id )
}
}
resp . Segments = ret
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2022-10-21 15:57:28 +08:00
// ShowConfigurations returns the configurations of DataCoord matching req.Pattern
2022-08-12 13:20:39 +08:00
func ( s * Server ) ShowConfigurations ( ctx context . Context , req * internalpb . ShowConfigurationsRequest ) ( * internalpb . ShowConfigurationsResponse , error ) {
log . Debug ( "DataCoord.ShowConfigurations" , zap . String ( "pattern" , req . Pattern ) )
if s . isClosed ( ) {
log . Warn ( "DataCoord.ShowConfigurations failed" ,
2022-11-04 14:25:38 +08:00
zap . Int64 ( "nodeId" , paramtable . GetNodeID ( ) ) ,
2022-08-12 13:20:39 +08:00
zap . String ( "req" , req . Pattern ) ,
2022-11-04 14:25:38 +08:00
zap . Error ( errDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) ) ) )
2022-08-12 13:20:39 +08:00
return & internalpb . ShowConfigurationsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2022-11-04 14:25:38 +08:00
Reason : msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) ) ,
2022-08-12 13:20:39 +08:00
} ,
Configuations : nil ,
} , nil
}
2022-12-09 14:31:21 +08:00
configList := make ( [ ] * commonpb . KeyValuePair , 0 )
for key , value := range Params . GetComponentConfigurations ( ctx , "datacoord" , req . Pattern ) {
configList = append ( configList ,
& commonpb . KeyValuePair {
Key : key ,
Value : value ,
} )
}
return & internalpb . ShowConfigurationsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
Configuations : configList ,
} , nil
2022-08-12 13:20:39 +08:00
}
2021-09-06 11:12:42 +08:00
// GetMetrics returns DataCoord metrics info
// it may include SystemMetrics, Topology metrics, etc.
2021-09-01 10:13:15 +08:00
func ( s * Server ) GetMetrics ( ctx context . Context , req * milvuspb . GetMetricsRequest ) ( * milvuspb . GetMetricsResponse , error ) {
if s . isClosed ( ) {
log . Warn ( "DataCoord.GetMetrics failed" ,
2023-01-04 17:39:35 +08:00
zap . Int64 ( "nodeID" , paramtable . GetNodeID ( ) ) ,
2021-09-01 10:13:15 +08:00
zap . String ( "req" , req . Request ) ,
2022-11-04 14:25:38 +08:00
zap . Error ( errDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) ) ) )
2021-09-01 10:13:15 +08:00
return & milvuspb . GetMetricsResponse {
2022-11-04 14:25:38 +08:00
ComponentName : metricsinfo . ConstructComponentName ( typeutil . DataCoordRole , paramtable . GetNodeID ( ) ) ,
2021-09-01 10:13:15 +08:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2022-11-04 14:25:38 +08:00
Reason : msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) ) ,
2021-09-01 10:13:15 +08:00
} ,
Response : "" ,
} , nil
}
metricType , err := metricsinfo . ParseMetricType ( req . Request )
if err != nil {
log . Warn ( "DataCoord.GetMetrics failed to parse metric type" ,
2023-01-04 17:39:35 +08:00
zap . Int64 ( "nodeID" , paramtable . GetNodeID ( ) ) ,
2021-09-01 10:13:15 +08:00
zap . String ( "req" , req . Request ) ,
zap . Error ( err ) )
return & milvuspb . GetMetricsResponse {
2022-11-04 14:25:38 +08:00
ComponentName : metricsinfo . ConstructComponentName ( typeutil . DataCoordRole , paramtable . GetNodeID ( ) ) ,
2021-09-01 10:13:15 +08:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : err . Error ( ) ,
} ,
Response : "" ,
} , nil
}
if metricType == metricsinfo . SystemInfoMetrics {
metrics , err := s . getSystemInfoMetrics ( ctx , req )
2022-10-25 19:39:30 +08:00
if err != nil {
2022-11-04 14:25:38 +08:00
log . Warn ( "DataCoord GetMetrics failed" , zap . Int64 ( "nodeID" , paramtable . GetNodeID ( ) ) , zap . Error ( err ) )
2022-10-25 19:39:30 +08:00
return & milvuspb . GetMetricsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : err . Error ( ) ,
} ,
} , nil
}
2021-09-01 10:13:15 +08:00
2023-01-04 17:39:35 +08:00
log . RatedDebug ( 60 , "DataCoord.GetMetrics" ,
zap . Int64 ( "nodeID" , paramtable . GetNodeID ( ) ) ,
2021-09-01 10:13:15 +08:00
zap . String ( "req" , req . Request ) ,
2023-01-04 17:39:35 +08:00
zap . String ( "metricType" , metricType ) ,
2021-09-01 10:13:15 +08:00
zap . Any ( "metrics" , metrics ) , // TODO(dragondriver): necessary? may be very large
zap . Error ( err ) )
2021-12-01 22:17:46 +08:00
return metrics , nil
2021-09-01 10:13:15 +08:00
}
2022-03-02 15:35:55 +08:00
log . RatedWarn ( 60.0 , "DataCoord.GetMetrics failed, request metric type is not implemented yet" ,
2023-01-04 17:39:35 +08:00
zap . Int64 ( "nodeID" , paramtable . GetNodeID ( ) ) ,
2021-09-01 10:13:15 +08:00
zap . String ( "req" , req . Request ) ,
2023-01-04 17:39:35 +08:00
zap . String ( "metricType" , metricType ) )
2021-09-01 10:13:15 +08:00
return & milvuspb . GetMetricsResponse {
2022-11-04 14:25:38 +08:00
ComponentName : metricsinfo . ConstructComponentName ( typeutil . DataCoordRole , paramtable . GetNodeID ( ) ) ,
2021-09-01 10:13:15 +08:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : metricsinfo . MsgUnimplementedMetric ,
} ,
Response : "" ,
} , nil
}
2021-11-05 22:25:00 +08:00
2021-11-10 23:58:57 +08:00
// ManualCompaction triggers a compaction for a collection
2021-11-09 14:47:02 +08:00
func ( s * Server ) ManualCompaction ( ctx context . Context , req * milvuspb . ManualCompactionRequest ) ( * milvuspb . ManualCompactionResponse , error ) {
2022-03-02 15:35:55 +08:00
log . Info ( "received manual compaction" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) )
2021-11-05 22:25:00 +08:00
2021-11-09 14:47:02 +08:00
resp := & milvuspb . ManualCompactionResponse {
2021-11-05 22:25:00 +08:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
if s . isClosed ( ) {
log . Warn ( "failed to execute manual compaction" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) ,
2022-11-04 14:25:38 +08:00
zap . Error ( errDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) ) ) )
resp . Status . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2021-11-05 22:25:00 +08:00
return resp , nil
}
2022-12-07 18:01:19 +08:00
if ! Params . DataCoordCfg . EnableCompaction . GetAsBool ( ) {
2021-11-05 22:25:00 +08:00
resp . Status . Reason = "compaction disabled"
return resp , nil
}
2022-10-10 20:31:22 +08:00
id , err := s . compactionTrigger . forceTriggerCompaction ( req . CollectionID )
2021-11-05 22:25:00 +08:00
if err != nil {
log . Error ( "failed to trigger manual compaction" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) , zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
2022-03-02 15:35:55 +08:00
log . Info ( "success to trigger manual compaction" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) , zap . Int64 ( "compactionID" , id ) )
2021-11-05 22:25:00 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . CompactionID = id
return resp , nil
}
2021-11-11 00:00:45 +08:00
// GetCompactionState gets the state of a compaction
2021-11-09 14:47:02 +08:00
func ( s * Server ) GetCompactionState ( ctx context . Context , req * milvuspb . GetCompactionStateRequest ) ( * milvuspb . GetCompactionStateResponse , error ) {
2022-03-02 15:35:55 +08:00
log . Info ( "received get compaction state request" , zap . Int64 ( "compactionID" , req . GetCompactionID ( ) ) )
2021-11-09 14:47:02 +08:00
resp := & milvuspb . GetCompactionStateResponse {
2021-11-05 22:25:00 +08:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
if s . isClosed ( ) {
log . Warn ( "failed to get compaction state" , zap . Int64 ( "compactionID" , req . GetCompactionID ( ) ) ,
2022-11-04 14:25:38 +08:00
zap . Error ( errDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) ) ) )
resp . Status . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2021-11-05 22:25:00 +08:00
return resp , nil
}
2022-12-07 18:01:19 +08:00
if ! Params . DataCoordCfg . EnableCompaction . GetAsBool ( ) {
2021-11-05 22:25:00 +08:00
resp . Status . Reason = "compaction disabled"
return resp , nil
}
2021-11-09 14:47:02 +08:00
tasks := s . compactionHandler . getCompactionTasksBySignalID ( req . GetCompactionID ( ) )
2022-08-23 15:50:52 +08:00
state , executingCnt , completedCnt , failedCnt , timeoutCnt := getCompactionState ( tasks )
2021-11-09 14:47:02 +08:00
resp . State = state
resp . ExecutingPlanNo = int64 ( executingCnt )
resp . CompletedPlanNo = int64 ( completedCnt )
resp . TimeoutPlanNo = int64 ( timeoutCnt )
2022-08-23 15:50:52 +08:00
resp . FailedPlanNo = int64 ( failedCnt )
2021-11-09 14:47:02 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
2022-03-02 15:35:55 +08:00
log . Info ( "success to get compaction state" , zap . Any ( "state" , state ) , zap . Int ( "executing" , executingCnt ) ,
2022-08-23 15:50:52 +08:00
zap . Int ( "completed" , completedCnt ) , zap . Int ( "failed" , failedCnt ) , zap . Int ( "timeout" , timeoutCnt ) ,
zap . Int64s ( "plans" , lo . Map ( tasks , func ( t * compactionTask , _ int ) int64 {
if t . plan == nil {
return - 1
}
return t . plan . PlanID
} ) ) )
2021-11-09 14:47:02 +08:00
return resp , nil
}
2021-11-19 18:05:25 +08:00
// GetCompactionStateWithPlans returns the compaction state of given plan
2021-11-09 14:47:02 +08:00
func ( s * Server ) GetCompactionStateWithPlans ( ctx context . Context , req * milvuspb . GetCompactionPlansRequest ) ( * milvuspb . GetCompactionPlansResponse , error ) {
2022-03-02 15:35:55 +08:00
log . Info ( "received the request to get compaction state with plans" , zap . Int64 ( "compactionID" , req . GetCompactionID ( ) ) )
2021-11-09 14:47:02 +08:00
resp := & milvuspb . GetCompactionPlansResponse {
Status : & commonpb . Status { ErrorCode : commonpb . ErrorCode_UnexpectedError } ,
}
if s . isClosed ( ) {
2022-11-04 14:25:38 +08:00
log . Warn ( "failed to get compaction state with plans" , zap . Int64 ( "compactionID" , req . GetCompactionID ( ) ) , zap . Error ( errDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) ) ) )
resp . Status . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2021-11-09 14:47:02 +08:00
return resp , nil
}
2022-12-07 18:01:19 +08:00
if ! Params . DataCoordCfg . EnableCompaction . GetAsBool ( ) {
2021-11-09 14:47:02 +08:00
resp . Status . Reason = "compaction disabled"
return resp , nil
2021-11-05 22:25:00 +08:00
}
2021-11-09 14:47:02 +08:00
tasks := s . compactionHandler . getCompactionTasksBySignalID ( req . GetCompactionID ( ) )
for _ , task := range tasks {
resp . MergeInfos = append ( resp . MergeInfos , getCompactionMergeInfo ( task ) )
}
2022-08-23 15:50:52 +08:00
state , _ , _ , _ , _ := getCompactionState ( tasks )
2021-11-09 14:47:02 +08:00
2021-11-05 22:25:00 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
2021-11-09 14:47:02 +08:00
resp . State = state
2022-08-23 15:50:52 +08:00
log . Info ( "success to get state with plans" , zap . Any ( "state" , state ) , zap . Any ( "merge infos" , resp . MergeInfos ) ,
zap . Int64s ( "plans" , lo . Map ( tasks , func ( t * compactionTask , _ int ) int64 {
if t . plan == nil {
return - 1
}
return t . plan . PlanID
} ) ) )
2021-11-05 22:25:00 +08:00
return resp , nil
}
2021-11-09 14:47:02 +08:00
func getCompactionMergeInfo ( task * compactionTask ) * milvuspb . CompactionMergeInfo {
segments := task . plan . GetSegmentBinlogs ( )
var sources [ ] int64
for _ , s := range segments {
sources = append ( sources , s . GetSegmentID ( ) )
}
var target int64 = - 1
if task . result != nil {
target = task . result . GetSegmentID ( )
}
return & milvuspb . CompactionMergeInfo {
Sources : sources ,
Target : target ,
}
}
2022-08-23 15:50:52 +08:00
func getCompactionState ( tasks [ ] * compactionTask ) ( state commonpb . CompactionState , executingCnt , completedCnt , failedCnt , timeoutCnt int ) {
2021-11-09 14:47:02 +08:00
for _ , t := range tasks {
switch t . state {
case executing :
executingCnt ++
case completed :
completedCnt ++
2022-08-23 15:50:52 +08:00
case failed :
failedCnt ++
2021-11-09 14:47:02 +08:00
case timeout :
timeoutCnt ++
}
}
if executingCnt != 0 {
state = commonpb . CompactionState_Executing
} else {
state = commonpb . CompactionState_Completed
}
return
}
2021-11-11 00:54:45 +08:00
2022-02-22 13:15:51 +08:00
// WatchChannels notifies DataCoord to watch vchannels of a collection.
2021-11-11 00:54:45 +08:00
func ( s * Server ) WatchChannels ( ctx context . Context , req * datapb . WatchChannelsRequest ) ( * datapb . WatchChannelsResponse , error ) {
2022-03-02 15:35:55 +08:00
log . Info ( "receive watch channels request" , zap . Any ( "channels" , req . GetChannelNames ( ) ) )
2021-11-11 00:54:45 +08:00
resp := & datapb . WatchChannelsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
if s . isClosed ( ) {
2021-11-23 10:55:14 +08:00
log . Warn ( "failed to watch channels request" , zap . Any ( "channels" , req . GetChannelNames ( ) ) ,
2022-11-04 14:25:38 +08:00
zap . Error ( errDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) ) ) )
resp . Status . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2021-11-11 00:54:45 +08:00
return resp , nil
}
for _ , channelName := range req . GetChannelNames ( ) {
ch := & channel {
2022-08-22 14:42:52 +08:00
Name : channelName ,
CollectionID : req . GetCollectionID ( ) ,
StartPositions : req . GetStartPositions ( ) ,
2022-10-10 01:37:21 +08:00
Schema : req . GetSchema ( ) ,
2021-11-11 00:54:45 +08:00
}
err := s . channelManager . Watch ( ch )
if err != nil {
log . Warn ( "fail to watch channelName" , zap . String ( "channelName" , channelName ) , zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-11-23 10:55:14 +08:00
// GetFlushState gets the flush state of multiple segments
func ( s * Server ) GetFlushState ( ctx context . Context , req * milvuspb . GetFlushStateRequest ) ( * milvuspb . GetFlushStateResponse , error ) {
resp := & milvuspb . GetFlushStateResponse { Status : & commonpb . Status { ErrorCode : commonpb . ErrorCode_UnexpectedError } }
if s . isClosed ( ) {
2022-09-26 15:40:53 +08:00
log . Warn ( "DataCoord receive GetFlushState request, server closed" ,
2021-12-17 19:48:42 +08:00
zap . Int64s ( "segmentIDs" , req . GetSegmentIDs ( ) ) , zap . Int ( "len" , len ( req . GetSegmentIDs ( ) ) ) )
2022-11-04 14:25:38 +08:00
resp . Status . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2021-11-23 10:55:14 +08:00
return resp , nil
}
var unflushed [ ] UniqueID
for _ , sid := range req . GetSegmentIDs ( ) {
segment := s . meta . GetSegment ( sid )
2022-06-24 15:16:14 +08:00
// segment is nil if it was compacted or it's a empty segment and is set to dropped
2022-02-10 16:55:46 +08:00
if segment == nil || segment . GetState ( ) == commonpb . SegmentState_Flushing ||
2021-11-23 10:55:14 +08:00
segment . GetState ( ) == commonpb . SegmentState_Flushed {
continue
}
unflushed = append ( unflushed , sid )
}
if len ( unflushed ) != 0 {
2022-12-30 15:37:31 +08:00
log . RatedInfo ( 10 , "DataCoord receive GetFlushState request, Flushed is false" , zap . Int64s ( "segmentIDs" , unflushed ) , zap . Int ( "len" , len ( unflushed ) ) )
2021-11-23 10:55:14 +08:00
resp . Flushed = false
} else {
2022-09-26 15:40:53 +08:00
log . Info ( "DataCoord receive GetFlushState request, Flushed is true" , zap . Int64s ( "segmentIDs" , req . GetSegmentIDs ( ) ) , zap . Int ( "len" , len ( req . GetSegmentIDs ( ) ) ) )
2021-11-23 10:55:14 +08:00
resp . Flushed = true
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2022-03-09 18:33:59 +08:00
2022-05-07 14:05:52 +08:00
// Import distributes the import tasks to dataNodes.
// It returns a failed status if no dataNode is available or if any error occurs.
2022-04-01 11:33:28 +08:00
func ( s * Server ) Import ( ctx context . Context , itr * datapb . ImportTaskRequest ) ( * datapb . ImportTaskResponse , error ) {
2022-06-02 18:54:04 +08:00
log . Info ( "DataCoord receives import request" , zap . Any ( "import task request" , itr ) )
2022-03-22 15:11:24 +08:00
resp := & datapb . ImportTaskResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
2022-03-09 18:33:59 +08:00
}
2022-03-21 15:47:23 +08:00
if s . isClosed ( ) {
2022-06-02 18:54:04 +08:00
log . Error ( "failed to import for closed DataCoord service" )
2022-11-04 14:25:38 +08:00
resp . Status . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2022-03-21 15:47:23 +08:00
return resp , nil
}
2022-09-26 18:06:54 +08:00
nodes := s . sessionManager . getLiveNodeIDs ( )
2022-04-01 11:33:28 +08:00
if len ( nodes ) == 0 {
2022-06-02 18:54:04 +08:00
log . Error ( "import failed as all DataNodes are offline" )
2022-04-01 11:33:28 +08:00
return resp , nil
}
2022-09-26 18:06:54 +08:00
log . Info ( "available DataNodes are" , zap . Int64s ( "node ID" , nodes ) )
2022-05-31 15:40:04 +08:00
2022-05-07 14:05:52 +08:00
avaNodes := getDiff ( nodes , itr . GetWorkingNodes ( ) )
2022-04-01 11:33:28 +08:00
if len ( avaNodes ) > 0 {
// If there exists available DataNodes, pick one at random.
2022-05-31 15:40:04 +08:00
resp . DatanodeId = avaNodes [ rand . Intn ( len ( avaNodes ) ) ]
2022-04-01 11:33:28 +08:00
log . Info ( "picking a free dataNode" ,
zap . Any ( "all dataNodes" , nodes ) ,
2022-05-31 15:40:04 +08:00
zap . Int64 ( "picking free dataNode with ID" , resp . GetDatanodeId ( ) ) )
s . cluster . Import ( s . ctx , resp . GetDatanodeId ( ) , itr )
2022-04-01 11:33:28 +08:00
} else {
2022-05-07 14:05:52 +08:00
// No dataNode is available, reject the import request.
2022-06-02 18:54:04 +08:00
msg := "all DataNodes are busy working on data import, the task has been rejected and wait for idle datanode"
2022-05-31 15:40:04 +08:00
log . Info ( msg , zap . Int64 ( "task ID" , itr . GetImportTask ( ) . GetTaskId ( ) ) )
resp . Status . Reason = msg
2022-05-07 14:05:52 +08:00
return resp , nil
2022-04-01 11:33:28 +08:00
}
2022-03-22 15:11:24 +08:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
2022-03-09 18:33:59 +08:00
return resp , nil
}
2022-04-01 11:33:28 +08:00
2022-04-20 14:03:40 +08:00
// UpdateSegmentStatistics updates a segment's stats.
func ( s * Server ) UpdateSegmentStatistics ( ctx context . Context , req * datapb . UpdateSegmentStatisticsRequest ) ( * commonpb . Status , error ) {
resp := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : "" ,
}
if s . isClosed ( ) {
log . Warn ( "failed to update segment stat for closed server" )
2022-11-04 14:25:38 +08:00
resp . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2022-04-20 14:03:40 +08:00
return resp , nil
}
s . updateSegmentStatistics ( req . GetStats ( ) )
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} , nil
}
2022-11-10 22:13:04 +08:00
// UpdateChannelCheckpoint updates channel checkpoint in dataCoord.
func ( s * Server ) UpdateChannelCheckpoint ( ctx context . Context , req * datapb . UpdateChannelCheckpointRequest ) ( * commonpb . Status , error ) {
resp := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
}
if s . isClosed ( ) {
log . Warn ( "failed to update channel position for closed server" )
resp . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
return resp , nil
}
err := s . meta . UpdateChannelCheckpoint ( req . GetVChannel ( ) , req . GetPosition ( ) )
if err != nil {
log . Warn ( "failed to UpdateChannelCheckpoint" , zap . String ( "vChannel" , req . GetVChannel ( ) ) , zap . Error ( err ) )
resp . Reason = err . Error ( )
return resp , nil
}
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} , nil
}
2022-04-01 11:33:28 +08:00
// getDiff returns the difference of base and remove. i.e. all items that are in `base` but not in `remove`.
func getDiff ( base , remove [ ] int64 ) [ ] int64 {
mb := make ( map [ int64 ] struct { } , len ( remove ) )
for _ , x := range remove {
mb [ x ] = struct { } { }
}
var diff [ ] int64
for _ , x := range base {
if _ , found := mb [ x ] ; ! found {
diff = append ( diff , x )
}
}
return diff
}
2022-05-31 16:36:03 +08:00
2022-09-26 18:06:54 +08:00
// SaveImportSegment saves the segment binlog paths and puts this segment to its belonging DataNode as a flushed segment.
func ( s * Server ) SaveImportSegment ( ctx context . Context , req * datapb . SaveImportSegmentRequest ) ( * commonpb . Status , error ) {
log . Info ( "DataCoord putting segment to the right DataNode and saving binlog path" ,
2022-06-02 18:54:04 +08:00
zap . Int64 ( "segment ID" , req . GetSegmentId ( ) ) ,
zap . Int64 ( "collection ID" , req . GetCollectionId ( ) ) ,
zap . Int64 ( "partition ID" , req . GetPartitionId ( ) ) ,
zap . String ( "channel name" , req . GetChannelName ( ) ) ,
zap . Int64 ( "# of rows" , req . GetRowNum ( ) ) )
errResp := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : "" ,
}
if s . isClosed ( ) {
log . Warn ( "failed to add segment for closed server" )
2022-09-26 18:06:54 +08:00
errResp . ErrorCode = commonpb . ErrorCode_DataCoordNA
2022-11-04 14:25:38 +08:00
errResp . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2022-06-02 18:54:04 +08:00
return errResp , nil
}
2022-09-26 18:06:54 +08:00
// Look for the DataNode that watches the channel.
2022-06-02 18:54:04 +08:00
ok , nodeID := s . channelManager . getNodeIDByChannelName ( req . GetChannelName ( ) )
if ! ok {
log . Error ( "no DataNode found for channel" , zap . String ( "channel name" , req . GetChannelName ( ) ) )
errResp . Reason = fmt . Sprint ( "no DataNode found for channel " , req . GetChannelName ( ) )
return errResp , nil
}
2022-09-26 18:06:54 +08:00
// Call DataNode to add the new segment to its own flow graph.
cli , err := s . sessionManager . getClient ( ctx , nodeID )
if err != nil {
log . Error ( "failed to get DataNode client for SaveImportSegment" ,
zap . Int64 ( "DataNode ID" , nodeID ) ,
zap . Error ( err ) )
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} , nil
}
resp , err := cli . AddImportSegment ( ctx ,
& datapb . AddImportSegmentRequest {
2022-10-21 15:57:28 +08:00
Base : commonpbutil . NewMsgBase (
commonpbutil . WithTimeStamp ( req . GetBase ( ) . GetTimestamp ( ) ) ,
2022-11-04 14:25:38 +08:00
commonpbutil . WithSourceID ( paramtable . GetNodeID ( ) ) ,
2022-10-21 15:57:28 +08:00
) ,
2022-09-26 18:06:54 +08:00
SegmentId : req . GetSegmentId ( ) ,
ChannelName : req . GetChannelName ( ) ,
CollectionId : req . GetCollectionId ( ) ,
PartitionId : req . GetPartitionId ( ) ,
RowNum : req . GetRowNum ( ) ,
StatsLog : req . GetSaveBinlogPathReq ( ) . GetField2StatslogPaths ( ) ,
} )
if err := VerifyResponse ( resp . GetStatus ( ) , err ) ; err != nil {
log . Error ( "failed to add segment" , zap . Int64 ( "DataNode ID" , nodeID ) , zap . Error ( err ) )
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} , nil
}
log . Info ( "succeed to add segment" , zap . Int64 ( "DataNode ID" , nodeID ) , zap . Any ( "add segment req" , req ) )
// Fill in start position message ID.
req . SaveBinlogPathReq . StartPositions [ 0 ] . StartPosition . MsgID = resp . GetChannelPos ( )
// Start saving bin log paths.
rsp , err := s . SaveBinlogPaths ( context . Background ( ) , req . GetSaveBinlogPathReq ( ) )
if err := VerifyResponse ( rsp , err ) ; err != nil {
log . Error ( "failed to SaveBinlogPaths" , zap . Error ( err ) )
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} , nil
}
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} , nil
}
// UnsetIsImportingState unsets the isImporting states of the given segments.
// An error status will be returned and error will be logged, if we failed to update *all* segments.
func ( s * Server ) UnsetIsImportingState ( ctx context . Context , req * datapb . UnsetIsImportingStateRequest ) ( * commonpb . Status , error ) {
log . Info ( "unsetting isImport state of segments" ,
zap . Int64s ( "segments" , req . GetSegmentIds ( ) ) )
failure := false
for _ , segID := range req . GetSegmentIds ( ) {
if err := s . meta . UnsetIsImporting ( segID ) ; err != nil {
// Fail-open.
log . Error ( "failed to unset segment is importing state" , zap . Int64 ( "segment ID" , segID ) )
failure = true
}
}
if failure {
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} , nil
}
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} , nil
}
// MarkSegmentsDropped marks the given segments as `Dropped`.
// An error status will be returned and error will be logged, if we failed to mark *all* segments.
func ( s * Server ) MarkSegmentsDropped ( ctx context . Context , req * datapb . MarkSegmentsDroppedRequest ) ( * commonpb . Status , error ) {
log . Info ( "marking segments dropped" ,
zap . Int64s ( "segments" , req . GetSegmentIds ( ) ) )
failure := false
for _ , segID := range req . GetSegmentIds ( ) {
if err := s . meta . SetState ( segID , commonpb . SegmentState_Dropped ) ; err != nil {
// Fail-open.
log . Error ( "failed to set segment state as dropped" , zap . Int64 ( "segment ID" , segID ) )
failure = true
}
}
if failure {
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} , nil
}
2022-06-02 18:54:04 +08:00
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} , nil
}
2022-10-10 20:31:22 +08:00
2022-11-21 10:09:10 +08:00
func ( s * Server ) BroadcastAlteredCollection ( ctx context . Context , req * datapb . AlterCollectionRequest ) ( * commonpb . Status , error ) {
2022-10-10 20:31:22 +08:00
errResp := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : "" ,
}
if s . isClosed ( ) {
log . Warn ( "failed to broadcast collection information for closed server" )
2022-11-04 14:25:38 +08:00
errResp . Reason = msgDataCoordIsUnhealthy ( paramtable . GetNodeID ( ) )
2022-10-10 20:31:22 +08:00
return errResp , nil
}
// get collection info from cache
clonedColl := s . meta . GetClonedCollectionInfo ( req . CollectionID )
properties := make ( map [ string ] string )
for _ , pair := range req . Properties {
properties [ pair . GetKey ( ) ] = pair . GetValue ( )
}
2022-11-21 10:09:10 +08:00
// cache miss and update cache
if clonedColl == nil {
collInfo := & collectionInfo {
ID : req . GetCollectionID ( ) ,
Schema : req . GetSchema ( ) ,
Partitions : req . GetPartitionIDs ( ) ,
StartPositions : req . GetStartPositions ( ) ,
Properties : properties ,
}
s . meta . AddCollection ( collInfo )
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} , nil
}
2022-10-10 20:31:22 +08:00
clonedColl . Properties = properties
s . meta . AddCollection ( clonedColl )
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} , nil
}
2022-10-18 13:39:26 +08:00
func ( s * Server ) CheckHealth ( ctx context . Context , req * milvuspb . CheckHealthRequest ) ( * milvuspb . CheckHealthResponse , error ) {
if s . isClosed ( ) {
reason := errorutil . UnHealthReason ( "datacoord" , s . session . ServerID , "datacoord is closed" )
return & milvuspb . CheckHealthResponse { IsHealthy : false , Reasons : [ ] string { reason } } , nil
}
mu := & sync . Mutex { }
group , ctx := errgroup . WithContext ( ctx )
nodes := s . sessionManager . getLiveNodeIDs ( )
errReasons := make ( [ ] string , 0 , len ( nodes ) )
for _ , nodeID := range nodes {
nodeID := nodeID
group . Go ( func ( ) error {
cli , err := s . sessionManager . getClient ( ctx , nodeID )
if err != nil {
mu . Lock ( )
defer mu . Unlock ( )
errReasons = append ( errReasons , errorutil . UnHealthReason ( "datanode" , nodeID , err . Error ( ) ) )
return err
}
sta , err := cli . GetComponentStates ( ctx )
isHealthy , reason := errorutil . UnHealthReasonWithComponentStatesOrErr ( "datanode" , nodeID , sta , err )
if ! isHealthy {
mu . Lock ( )
defer mu . Unlock ( )
errReasons = append ( errReasons , reason )
}
return err
} )
}
err := group . Wait ( )
if err != nil || len ( errReasons ) != 0 {
return & milvuspb . CheckHealthResponse { IsHealthy : false , Reasons : errReasons } , nil
}
return & milvuspb . CheckHealthResponse { IsHealthy : true , Reasons : errReasons } , nil
}