2021-04-19 11:12:56 +08:00
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
2021-06-18 21:30:08 +08:00
package rootcoord
2021-01-19 14:44:03 +08:00
import (
"context"
2021-05-14 10:05:18 +08:00
"encoding/json"
2021-01-25 18:33:10 +08:00
"fmt"
2021-01-19 14:44:03 +08:00
"math/rand"
2021-05-26 20:14:30 +08:00
"os"
2021-01-19 14:44:03 +08:00
"sync"
"sync/atomic"
"time"
2021-05-14 21:26:06 +08:00
"github.com/golang/protobuf/proto"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/allocator"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
2021-06-01 11:04:31 +08:00
"github.com/milvus-io/milvus/internal/metrics"
2021-04-22 14:45:57 +08:00
ms "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
2021-05-15 18:08:08 +08:00
"github.com/milvus-io/milvus/internal/proto/etcdpb"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
2021-06-22 16:14:09 +08:00
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
2021-05-15 18:08:08 +08:00
"github.com/milvus-io/milvus/internal/proto/schemapb"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
2021-05-21 19:28:52 +08:00
"github.com/milvus-io/milvus/internal/util/sessionutil"
2021-06-30 16:18:13 +08:00
"github.com/milvus-io/milvus/internal/util/trace"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-06-08 19:25:37 +08:00
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
2021-01-19 14:44:03 +08:00
)
// ------------------ struct -----------------------
2021-05-14 21:26:06 +08:00
// DdOperation used to save ddMsg into ETCD
type DdOperation struct {
2021-07-06 09:16:03 +08:00
Body string ` json:"body" `
Type string ` json:"type" `
2021-05-14 21:26:06 +08:00
}
2021-06-01 11:04:31 +08:00
const (
// MetricRequestsTotal used to count the num of total requests
MetricRequestsTotal = "total"
// MetricRequestsSuccess used to count the num of successful requests
MetricRequestsSuccess = "success"
)
2021-06-22 19:08:03 +08:00
func metricProxy ( v int64 ) string {
2021-06-01 11:04:31 +08:00
return fmt . Sprintf ( "client_%d" , v )
}
2021-06-17 16:47:57 +08:00
// Core root coordinator core
2021-01-19 14:44:03 +08:00
type Core struct {
MetaTable * metaTable
//id allocator
2021-05-20 14:14:14 +08:00
IDAllocator func ( count uint32 ) ( typeutil . UniqueID , typeutil . UniqueID , error )
IDAllocatorUpdate func ( ) error
2021-04-08 17:31:39 +08:00
2021-01-19 14:44:03 +08:00
//tso allocator
2021-05-20 14:14:14 +08:00
TSOAllocator func ( count uint32 ) ( typeutil . Timestamp , error )
TSOAllocatorUpdate func ( ) error
2021-01-19 14:44:03 +08:00
//inner members
ctx context . Context
cancel context . CancelFunc
etcdCli * clientv3 . Client
kvBase * etcdkv . EtcdKV
2021-01-20 09:36:50 +08:00
//setMsgStreams, send time tick into dd channel and time tick channel
2021-01-19 14:44:03 +08:00
SendTimeTick func ( t typeutil . Timestamp ) error
2021-01-20 09:36:50 +08:00
//setMsgStreams, send create collection into dd channel
2021-06-11 16:39:29 +08:00
SendDdCreateCollectionReq func ( ctx context . Context , req * internalpb . CreateCollectionRequest , channelNames [ ] string ) error
2021-01-19 14:44:03 +08:00
2021-01-20 09:36:50 +08:00
//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
2021-06-11 16:39:29 +08:00
SendDdDropCollectionReq func ( ctx context . Context , req * internalpb . DropCollectionRequest , channelNames [ ] string ) error
2021-01-19 14:44:03 +08:00
2021-01-20 09:36:50 +08:00
//setMsgStreams, send create partition into dd channel
2021-06-11 16:39:29 +08:00
SendDdCreatePartitionReq func ( ctx context . Context , req * internalpb . CreatePartitionRequest , channelNames [ ] string ) error
2021-01-19 14:44:03 +08:00
2021-01-20 09:36:50 +08:00
//setMsgStreams, send drop partition into dd channel
2021-06-11 16:39:29 +08:00
SendDdDropPartitionReq func ( ctx context . Context , req * internalpb . DropPartitionRequest , channelNames [ ] string ) error
2021-01-19 14:44:03 +08:00
2021-02-05 14:09:55 +08:00
//get binlog file path from data service,
2021-06-30 16:18:13 +08:00
CallGetBinlogFilePathsService func ( ctx context . Context , segID typeutil . UniqueID , fieldID typeutil . UniqueID ) ( [ ] string , error )
CallGetNumRowsService func ( ctx context . Context , segID typeutil . UniqueID , isFromFlushedChan bool ) ( int64 , error )
2021-07-03 14:36:18 +08:00
CallGetFlushedSegmentsService func ( ctx context . Context , collID , partID typeutil . UniqueID ) ( [ ] typeutil . UniqueID , error )
2021-01-21 10:01:29 +08:00
2021-02-05 14:09:55 +08:00
//call index builder's client to build index, return build id
2021-05-25 14:03:06 +08:00
CallBuildIndexService func ( ctx context . Context , binlog [ ] string , field * schemapb . FieldSchema , idxInfo * etcdpb . IndexInfo ) ( typeutil . UniqueID , error )
CallDropIndexService func ( ctx context . Context , indexID typeutil . UniqueID ) error
2021-01-21 10:01:29 +08:00
2021-06-22 19:08:03 +08:00
NewProxyClient func ( sess * sessionutil . Session ) ( types . Proxy , error )
2021-01-23 10:12:41 +08:00
2021-02-05 14:09:55 +08:00
//query service interface, notify query service to release collection
2021-06-22 16:08:08 +08:00
CallReleaseCollectionService func ( ctx context . Context , ts typeutil . Timestamp , dbID , collectionID typeutil . UniqueID ) error
CallReleasePartitionService func ( ctx context . Context , ts typeutil . Timestamp , dbID , collectionID typeutil . UniqueID , partitionIDs [ ] typeutil . UniqueID ) error
2021-02-05 14:09:55 +08:00
2021-06-04 15:00:34 +08:00
//dml channels
dmlChannels * dmlChannels
2021-06-22 19:08:03 +08:00
//Proxy manager
proxyManager * proxyManager
2021-05-26 20:14:30 +08:00
// proxy clients
proxyClientManager * proxyClientManager
2021-05-21 16:08:12 +08:00
// channel timetick
chanTimeTick * timetickSync
2021-01-19 14:44:03 +08:00
//time tick loop
lastTimeTick typeutil . Timestamp
//states code
stateCode atomic . Value
//call once
initOnce sync . Once
startOnce sync . Once
2021-02-23 11:40:30 +08:00
//isInit atomic.Value
2021-02-08 14:30:54 +08:00
2021-05-26 20:14:30 +08:00
session * sessionutil . Session
sessCloseCh <- chan bool
2021-05-21 19:28:52 +08:00
2021-02-08 14:30:54 +08:00
msFactory ms . Factory
2021-01-19 14:44:03 +08:00
}
// --------------------- function --------------------------
2021-02-08 14:30:54 +08:00
func NewCore ( c context . Context , factory ms . Factory ) ( * Core , error ) {
2021-01-19 14:44:03 +08:00
ctx , cancel := context . WithCancel ( c )
rand . Seed ( time . Now ( ) . UnixNano ( ) )
core := & Core {
2021-02-08 14:30:54 +08:00
ctx : ctx ,
cancel : cancel ,
msFactory : factory ,
2021-01-19 14:44:03 +08:00
}
2021-03-12 14:22:09 +08:00
core . UpdateStateCode ( internalpb . StateCode_Abnormal )
2021-01-19 14:44:03 +08:00
return core , nil
}
2021-03-12 14:22:09 +08:00
func ( c * Core ) UpdateStateCode ( code internalpb . StateCode ) {
2021-02-23 11:40:30 +08:00
c . stateCode . Store ( code )
}
2021-01-19 14:44:03 +08:00
func ( c * Core ) checkInit ( ) error {
if c . MetaTable == nil {
2021-04-08 15:26:18 +08:00
return fmt . Errorf ( "MetaTable is nil" )
2021-01-19 14:44:03 +08:00
}
2021-05-20 14:14:14 +08:00
if c . IDAllocator == nil {
2021-04-08 15:26:18 +08:00
return fmt . Errorf ( "idAllocator is nil" )
2021-01-19 14:44:03 +08:00
}
2021-05-20 14:14:14 +08:00
if c . IDAllocatorUpdate == nil {
2021-04-08 17:31:39 +08:00
return fmt . Errorf ( "idAllocatorUpdate is nil" )
}
2021-05-20 14:14:14 +08:00
if c . TSOAllocator == nil {
2021-04-08 15:26:18 +08:00
return fmt . Errorf ( "tsoAllocator is nil" )
2021-01-19 14:44:03 +08:00
}
2021-05-20 14:14:14 +08:00
if c . TSOAllocatorUpdate == nil {
2021-04-08 17:31:39 +08:00
return fmt . Errorf ( "tsoAllocatorUpdate is nil" )
}
2021-01-19 14:44:03 +08:00
if c . etcdCli == nil {
2021-04-08 15:26:18 +08:00
return fmt . Errorf ( "etcdCli is nil" )
2021-01-19 14:44:03 +08:00
}
if c . kvBase == nil {
2021-04-08 15:26:18 +08:00
return fmt . Errorf ( "kvBase is nil" )
2021-01-19 14:44:03 +08:00
}
2021-05-14 21:26:06 +08:00
if c . SendDdCreateCollectionReq == nil {
return fmt . Errorf ( "SendDdCreateCollectionReq is nil" )
2021-01-19 14:44:03 +08:00
}
2021-05-14 21:26:06 +08:00
if c . SendDdDropCollectionReq == nil {
return fmt . Errorf ( "SendDdDropCollectionReq is nil" )
2021-01-19 14:44:03 +08:00
}
2021-05-14 21:26:06 +08:00
if c . SendDdCreatePartitionReq == nil {
return fmt . Errorf ( "SendDdCreatePartitionReq is nil" )
2021-01-19 14:44:03 +08:00
}
2021-05-14 21:26:06 +08:00
if c . SendDdDropPartitionReq == nil {
return fmt . Errorf ( "SendDdDropPartitionReq is nil" )
2021-01-19 14:44:03 +08:00
}
2021-05-25 14:03:06 +08:00
if c . CallGetBinlogFilePathsService == nil {
return fmt . Errorf ( "CallGetBinlogFilePathsService is nil" )
2021-01-21 10:01:29 +08:00
}
2021-05-25 14:03:06 +08:00
if c . CallGetNumRowsService == nil {
return fmt . Errorf ( "CallGetNumRowsService is nil" )
2021-03-08 15:46:51 +08:00
}
2021-05-25 14:03:06 +08:00
if c . CallBuildIndexService == nil {
return fmt . Errorf ( "CallBuildIndexService is nil" )
2021-01-21 10:01:29 +08:00
}
2021-05-25 14:03:06 +08:00
if c . CallDropIndexService == nil {
return fmt . Errorf ( "CallDropIndexService is nil" )
2021-02-20 15:38:44 +08:00
}
2021-07-03 14:36:18 +08:00
if c . CallGetFlushedSegmentsService == nil {
return fmt . Errorf ( "CallGetFlushedSegments is nil" )
}
2021-05-26 20:14:30 +08:00
if c . NewProxyClient == nil {
2021-06-22 19:08:03 +08:00
return fmt . Errorf ( "NewProxyClient is nil" )
2021-05-25 14:03:06 +08:00
}
if c . CallReleaseCollectionService == nil {
return fmt . Errorf ( "CallReleaseCollectionService is nil" )
2021-01-23 10:12:41 +08:00
}
2021-06-22 16:08:08 +08:00
if c . CallReleasePartitionService == nil {
return fmt . Errorf ( "CallReleasePartitionService is nil" )
}
2021-05-26 20:14:30 +08:00
2021-01-19 14:44:03 +08:00
return nil
}
func ( c * Core ) startTimeTickLoop ( ) {
2021-05-31 16:48:31 +08:00
ticker := time . NewTicker ( time . Duration ( Params . TimeTickInterval ) * time . Millisecond )
for {
select {
case <- c . ctx . Done ( ) :
2021-06-17 16:47:57 +08:00
log . Debug ( "rootcoord context closed" , zap . Error ( c . ctx . Err ( ) ) )
2021-05-31 16:48:31 +08:00
return
case <- ticker . C :
2021-06-23 15:28:09 +08:00
if ts , err := c . TSOAllocator ( 1 ) ; err == nil {
c . SendTimeTick ( ts )
2021-01-19 14:44:03 +08:00
}
}
}
}
2021-01-27 16:38:18 +08:00
func ( c * Core ) tsLoop ( ) {
2021-02-24 17:12:06 +08:00
tsoTicker := time . NewTicker ( tso . UpdateTimestampStep )
2021-01-27 16:38:18 +08:00
defer tsoTicker . Stop ( )
ctx , cancel := context . WithCancel ( c . ctx )
defer cancel ( )
for {
select {
case <- tsoTicker . C :
2021-05-20 14:14:14 +08:00
if err := c . TSOAllocatorUpdate ( ) ; err != nil {
2021-03-15 15:45:17 +08:00
log . Warn ( "failed to update timestamp: " , zap . Error ( err ) )
continue
2021-01-27 16:38:18 +08:00
}
2021-05-20 14:14:14 +08:00
if err := c . IDAllocatorUpdate ( ) ; err != nil {
2021-03-15 15:45:17 +08:00
log . Warn ( "failed to update id: " , zap . Error ( err ) )
continue
2021-01-27 16:38:18 +08:00
}
case <- ctx . Done ( ) :
// Server is closed and it should return nil.
2021-02-27 10:11:52 +08:00
log . Debug ( "tsLoop is closed" )
2021-01-27 16:38:18 +08:00
return
}
}
}
2021-05-14 21:26:06 +08:00
2021-05-26 20:14:30 +08:00
func ( c * Core ) sessionLoop ( ) {
for {
select {
case <- c . ctx . Done ( ) :
return
case _ , ok := <- c . sessCloseCh :
if ! ok {
2021-06-17 16:47:57 +08:00
log . Error ( "rootcoord disconnect with etcd, process will exit in 1 second" )
2021-05-26 20:14:30 +08:00
go func ( ) {
time . Sleep ( time . Second )
os . Exit ( - 1 )
} ( )
2021-07-10 10:21:52 +08:00
return
2021-05-26 20:14:30 +08:00
}
}
}
}
2021-07-03 17:54:25 +08:00
func ( c * Core ) checkFlushedSegmentsLoop ( ) {
ticker := time . NewTicker ( 10 * time . Minute )
for {
select {
case <- c . ctx . Done ( ) :
log . Debug ( "RootCoord context done,exit checkFlushedSegmentsLoop" )
return
case <- ticker . C :
log . Debug ( "check flushed segments" )
collID2Meta , segID2IndexMeta , indexID2Meta := c . MetaTable . dupMeta ( )
for _ , collMeta := range collID2Meta {
if len ( collMeta . FieldIndexes ) == 0 {
continue
}
for _ , partID := range collMeta . PartitionIDs {
ctx2 , cancel2 := context . WithTimeout ( c . ctx , 3 * time . Minute )
segIDs , err := c . CallGetFlushedSegmentsService ( ctx2 , collMeta . ID , partID )
if err != nil {
log . Debug ( "get flushed segments from data coord failed" ,
zap . Int64 ( "collection id" , collMeta . ID ) ,
zap . Int64 ( "partition id" , partID ) ,
zap . Error ( err ) )
} else {
for _ , segID := range segIDs {
indexInfos := [ ] * etcdpb . FieldIndexInfo { }
indexMeta , ok := segID2IndexMeta [ segID ]
if ! ok {
indexInfos = append ( indexInfos , collMeta . FieldIndexes ... )
} else {
for _ , idx := range collMeta . FieldIndexes {
if _ , ok := indexMeta [ idx . IndexID ] ; ! ok {
indexInfos = append ( indexInfos , idx )
}
}
}
for _ , idxInfo := range indexInfos {
field , err := GetFieldSchemaByID ( & collMeta , idxInfo . FiledID )
if err != nil {
log . Debug ( "GetFieldSchemaByID" ,
zap . Any ( "collection_meta" , collMeta ) ,
zap . Int64 ( "field id" , idxInfo . FiledID ) )
continue
}
indexMeta , ok := indexID2Meta [ idxInfo . IndexID ]
if ! ok {
log . Debug ( "index meta not exist" , zap . Int64 ( "index_id" , idxInfo . IndexID ) )
continue
}
info := etcdpb . SegmentIndexInfo {
2021-07-05 10:08:02 +08:00
CollectionID : collMeta . ID ,
PartitionID : partID ,
SegmentID : segID ,
FieldID : idxInfo . FiledID ,
IndexID : idxInfo . IndexID ,
EnableIndex : false ,
2021-07-03 17:54:25 +08:00
}
log . Debug ( "build index by background checker" ,
zap . Int64 ( "segment_id" , segID ) ,
zap . Int64 ( "index_id" , indexMeta . IndexID ) ,
zap . Int64 ( "collection_id" , collMeta . ID ) )
info . BuildID , err = c . BuildIndex ( ctx2 , segID , field , & indexMeta , false )
if err != nil {
log . Debug ( "build index failed" ,
zap . Int64 ( "segment_id" , segID ) ,
zap . Int64 ( "field_id" , field . FieldID ) ,
zap . Int64 ( "index_id" , indexMeta . IndexID ) )
continue
}
if info . BuildID != 0 {
info . EnableIndex = true
}
2021-07-05 10:08:02 +08:00
if _ , err := c . MetaTable . AddIndex ( & info ) ; err != nil {
2021-07-03 17:54:25 +08:00
log . Debug ( "Add index into meta table failed" , zap . Int64 ( "collection_id" , collMeta . ID ) , zap . Int64 ( "index_id" , info . IndexID ) , zap . Int64 ( "build_id" , info . BuildID ) , zap . Error ( err ) )
}
}
}
}
cancel2 ( )
}
}
}
}
2021-05-26 20:14:30 +08:00
}
2021-07-03 14:36:18 +08:00
func ( c * Core ) getSegments ( ctx context . Context , collID typeutil . UniqueID ) ( map [ typeutil . UniqueID ] typeutil . UniqueID , error ) {
collMeta , err := c . MetaTable . GetCollectionByID ( collID , 0 )
if err != nil {
return nil , err
}
segID2PartID := map [ typeutil . UniqueID ] typeutil . UniqueID { }
for _ , partID := range collMeta . PartitionIDs {
if seg , err := c . CallGetFlushedSegmentsService ( ctx , collID , partID ) ; err == nil {
for _ , s := range seg {
segID2PartID [ s ] = partID
}
} else {
log . Debug ( "get flushed segments from data coord failed" , zap . Int64 ( "collection_id" , collID ) , zap . Int64 ( "partition_id" , partID ) , zap . Error ( err ) )
return nil , err
}
}
return segID2PartID , nil
}
2021-05-14 21:26:06 +08:00
func ( c * Core ) setDdMsgSendFlag ( b bool ) error {
2021-05-18 14:18:02 +08:00
flag , err := c . MetaTable . client . Load ( DDMsgSendPrefix , 0 )
2021-05-14 21:26:06 +08:00
if err != nil {
return err
}
if ( b && flag == "true" ) || ( ! b && flag == "false" ) {
log . Debug ( "DdMsg send flag need not change" , zap . String ( "flag" , flag ) )
return nil
}
if b {
2021-05-18 14:18:02 +08:00
_ , err = c . MetaTable . client . Save ( DDMsgSendPrefix , "true" )
return err
2021-05-14 21:26:06 +08:00
}
2021-05-18 14:18:02 +08:00
_ , err = c . MetaTable . client . Save ( DDMsgSendPrefix , "false" )
return err
2021-05-14 21:26:06 +08:00
}
2021-01-20 09:36:50 +08:00
func ( c * Core ) setMsgStreams ( ) error {
2021-01-24 20:26:35 +08:00
if Params . PulsarAddress == "" {
2021-04-08 15:26:18 +08:00
return fmt . Errorf ( "PulsarAddress is empty" )
2021-01-24 20:26:35 +08:00
}
if Params . MsgChannelSubName == "" {
2021-04-08 15:26:18 +08:00
return fmt . Errorf ( "MsgChannelSubName is emptyr" )
2021-01-24 20:26:35 +08:00
}
2021-06-17 16:47:57 +08:00
// rootcoord time tick channel
2021-01-24 20:26:35 +08:00
if Params . TimeTickChannel == "" {
2021-04-08 15:26:18 +08:00
return fmt . Errorf ( "TimeTickChannel is empty" )
2021-01-24 20:26:35 +08:00
}
2021-02-08 14:30:54 +08:00
timeTickStream , _ := c . msFactory . NewMsgStream ( c . ctx )
2021-02-04 14:37:12 +08:00
timeTickStream . AsProducer ( [ ] string { Params . TimeTickChannel } )
2021-06-17 16:47:57 +08:00
log . Debug ( "rootcoord AsProducer: " + Params . TimeTickChannel )
2021-01-20 09:36:50 +08:00
c . SendTimeTick = func ( t typeutil . Timestamp ) error {
msgPack := ms . MsgPack { }
baseMsg := ms . BaseMsg {
BeginTimestamp : t ,
EndTimestamp : t ,
HashValues : [ ] uint32 { 0 } ,
}
2021-03-12 14:22:09 +08:00
timeTickResult := internalpb . TimeTickMsg {
2021-01-20 09:36:50 +08:00
Base : & commonpb . MsgBase {
2021-03-10 14:45:35 +08:00
MsgType : commonpb . MsgType_TimeTick ,
2021-01-20 09:36:50 +08:00
MsgID : 0 ,
Timestamp : t ,
2021-05-25 15:06:05 +08:00
SourceID : c . session . ServerID ,
2021-01-20 09:36:50 +08:00
} ,
}
timeTickMsg := & ms . TimeTickMsg {
BaseMsg : baseMsg ,
TimeTickMsg : timeTickResult ,
}
msgPack . Msgs = append ( msgPack . Msgs , timeTickMsg )
2021-03-25 14:41:46 +08:00
if err := timeTickStream . Broadcast ( & msgPack ) ; err != nil {
2021-01-20 09:36:50 +08:00
return err
}
2021-06-17 16:47:57 +08:00
metrics . RootCoordDDChannelTimeTick . Set ( float64 ( tsoutil . Mod24H ( t ) ) )
2021-06-04 15:00:34 +08:00
2021-06-11 16:39:29 +08:00
//c.dmlChannels.BroadcastAll(&msgPack)
2021-06-04 15:00:34 +08:00
pc := c . MetaTable . ListCollectionPhysicalChannels ( )
pt := make ( [ ] uint64 , len ( pc ) )
for i := 0 ; i < len ( pt ) ; i ++ {
pt [ i ] = t
}
ttMsg := internalpb . ChannelTimeTickMsg {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_TimeTick ,
MsgID : 0 , //TODO
Timestamp : t ,
SourceID : c . session . ServerID ,
} ,
2021-06-17 15:54:07 +08:00
ChannelNames : pc ,
Timestamps : pt ,
DefaultTimestamp : t ,
2021-06-04 15:00:34 +08:00
}
return c . chanTimeTick . UpdateTimeTick ( & ttMsg )
2021-01-20 09:36:50 +08:00
}
2021-06-11 16:39:29 +08:00
c . SendDdCreateCollectionReq = func ( ctx context . Context , req * internalpb . CreateCollectionRequest , channelNames [ ] string ) error {
2021-01-20 09:36:50 +08:00
msgPack := ms . MsgPack { }
baseMsg := ms . BaseMsg {
2021-03-25 14:41:46 +08:00
Ctx : ctx ,
2021-01-20 09:36:50 +08:00
BeginTimestamp : req . Base . Timestamp ,
EndTimestamp : req . Base . Timestamp ,
HashValues : [ ] uint32 { 0 } ,
}
2021-06-04 15:00:34 +08:00
msg := & ms . CreateCollectionMsg {
2021-01-20 09:36:50 +08:00
BaseMsg : baseMsg ,
CreateCollectionRequest : * req ,
}
2021-06-04 15:00:34 +08:00
msgPack . Msgs = append ( msgPack . Msgs , msg )
2021-06-15 21:11:58 +08:00
return c . dmlChannels . BroadcastAll ( channelNames , & msgPack )
2021-01-20 09:36:50 +08:00
}
2021-06-11 16:39:29 +08:00
c . SendDdDropCollectionReq = func ( ctx context . Context , req * internalpb . DropCollectionRequest , channelNames [ ] string ) error {
2021-01-20 09:36:50 +08:00
msgPack := ms . MsgPack { }
baseMsg := ms . BaseMsg {
2021-03-25 14:41:46 +08:00
Ctx : ctx ,
2021-01-20 09:36:50 +08:00
BeginTimestamp : req . Base . Timestamp ,
EndTimestamp : req . Base . Timestamp ,
HashValues : [ ] uint32 { 0 } ,
}
2021-06-04 15:00:34 +08:00
msg := & ms . DropCollectionMsg {
2021-01-20 09:36:50 +08:00
BaseMsg : baseMsg ,
DropCollectionRequest : * req ,
}
2021-06-04 15:00:34 +08:00
msgPack . Msgs = append ( msgPack . Msgs , msg )
2021-06-15 21:11:58 +08:00
return c . dmlChannels . BroadcastAll ( channelNames , & msgPack )
2021-01-20 09:36:50 +08:00
}
2021-06-11 16:39:29 +08:00
c . SendDdCreatePartitionReq = func ( ctx context . Context , req * internalpb . CreatePartitionRequest , channelNames [ ] string ) error {
2021-01-20 09:36:50 +08:00
msgPack := ms . MsgPack { }
baseMsg := ms . BaseMsg {
2021-03-25 14:41:46 +08:00
Ctx : ctx ,
2021-01-20 09:36:50 +08:00
BeginTimestamp : req . Base . Timestamp ,
EndTimestamp : req . Base . Timestamp ,
HashValues : [ ] uint32 { 0 } ,
}
2021-06-04 15:00:34 +08:00
msg := & ms . CreatePartitionMsg {
2021-01-20 09:36:50 +08:00
BaseMsg : baseMsg ,
CreatePartitionRequest : * req ,
}
2021-06-04 15:00:34 +08:00
msgPack . Msgs = append ( msgPack . Msgs , msg )
2021-06-15 21:11:58 +08:00
return c . dmlChannels . BroadcastAll ( channelNames , & msgPack )
2021-01-20 09:36:50 +08:00
}
2021-06-11 16:39:29 +08:00
c . SendDdDropPartitionReq = func ( ctx context . Context , req * internalpb . DropPartitionRequest , channelNames [ ] string ) error {
2021-01-20 09:36:50 +08:00
msgPack := ms . MsgPack { }
baseMsg := ms . BaseMsg {
2021-03-25 14:41:46 +08:00
Ctx : ctx ,
2021-01-20 09:36:50 +08:00
BeginTimestamp : req . Base . Timestamp ,
EndTimestamp : req . Base . Timestamp ,
HashValues : [ ] uint32 { 0 } ,
}
2021-06-04 15:00:34 +08:00
msg := & ms . DropPartitionMsg {
2021-01-20 09:36:50 +08:00
BaseMsg : baseMsg ,
DropPartitionRequest : * req ,
}
2021-06-04 15:00:34 +08:00
msgPack . Msgs = append ( msgPack . Msgs , msg )
2021-06-15 21:11:58 +08:00
return c . dmlChannels . BroadcastAll ( channelNames , & msgPack )
2021-01-20 09:36:50 +08:00
}
return nil
}
2021-05-26 20:14:30 +08:00
//SetNewProxyClient create proxy node by this func
2021-06-22 19:08:03 +08:00
func ( c * Core ) SetNewProxyClient ( f func ( sess * sessionutil . Session ) ( types . Proxy , error ) ) {
2021-05-26 20:14:30 +08:00
if c . NewProxyClient == nil {
c . NewProxyClient = f
} else {
2021-06-01 11:04:31 +08:00
log . Debug ( "NewProxyClient has already set" )
2021-05-26 20:14:30 +08:00
}
}
2021-06-21 18:22:13 +08:00
func ( c * Core ) SetDataCoord ( ctx context . Context , s types . DataCoord ) error {
2021-06-25 16:48:10 +08:00
initCh := make ( chan struct { } )
go func ( ) {
for {
if err := s . Init ( ) ; err == nil {
if err := s . Start ( ) ; err == nil {
close ( initCh )
log . Debug ( "RootCoord connect to DataCoord" )
return
}
}
log . Debug ( "RootCoord connect to DataCoord, retry" )
}
} ( )
2021-06-30 16:18:13 +08:00
c . CallGetBinlogFilePathsService = func ( ctx context . Context , segID typeutil . UniqueID , fieldID typeutil . UniqueID ) ( retFiles [ ] string , retErr error ) {
2021-05-26 20:14:30 +08:00
defer func ( ) {
if err := recover ( ) ; err != nil {
retFiles = nil
retErr = fmt . Errorf ( "get bin log file paths panic, msg = %v" , err )
}
} ( )
2021-06-25 16:48:10 +08:00
<- initCh //wait connect to data coord
2021-05-20 14:14:14 +08:00
ts , err := c . TSOAllocator ( 1 )
2021-01-24 20:26:35 +08:00
if err != nil {
2021-05-26 20:14:30 +08:00
retFiles = nil
retErr = err
return
2021-01-24 20:26:35 +08:00
}
2021-03-12 14:22:09 +08:00
binlog , err := s . GetInsertBinlogPaths ( ctx , & datapb . GetInsertBinlogPathsRequest {
2021-01-24 20:26:35 +08:00
Base : & commonpb . MsgBase {
2021-03-08 15:46:51 +08:00
MsgType : 0 , //TODO, msg type
2021-01-24 20:26:35 +08:00
MsgID : 0 ,
Timestamp : ts ,
2021-05-25 15:06:05 +08:00
SourceID : c . session . ServerID ,
2021-01-24 20:26:35 +08:00
} ,
SegmentID : segID ,
} )
if err != nil {
2021-05-26 20:14:30 +08:00
retFiles = nil
retErr = err
return
2021-01-24 20:26:35 +08:00
}
2021-03-10 22:06:22 +08:00
if binlog . Status . ErrorCode != commonpb . ErrorCode_Success {
2021-05-26 20:14:30 +08:00
retFiles = nil
retErr = fmt . Errorf ( "GetInsertBinlogPaths from data service failed, error = %s" , binlog . Status . Reason )
return
2021-01-24 20:26:35 +08:00
}
for i := range binlog . FieldIDs {
if binlog . FieldIDs [ i ] == fieldID {
2021-05-26 20:14:30 +08:00
retFiles = binlog . Paths [ i ] . Values
retErr = nil
return
2021-01-24 20:26:35 +08:00
}
}
2021-05-26 20:14:30 +08:00
retFiles = nil
retErr = fmt . Errorf ( "binlog file not exist, segment id = %d, field id = %d" , segID , fieldID )
return
2021-01-24 20:26:35 +08:00
}
2021-03-08 15:46:51 +08:00
2021-06-30 16:18:13 +08:00
c . CallGetNumRowsService = func ( ctx context . Context , segID typeutil . UniqueID , isFromFlushedChan bool ) ( retRows int64 , retErr error ) {
2021-05-26 20:14:30 +08:00
defer func ( ) {
if err := recover ( ) ; err != nil {
retRows = 0
retErr = fmt . Errorf ( "get num rows panic, msg = %v" , err )
return
}
} ( )
2021-06-25 16:48:10 +08:00
<- initCh
2021-05-20 14:14:14 +08:00
ts , err := c . TSOAllocator ( 1 )
2021-03-08 15:46:51 +08:00
if err != nil {
2021-05-26 20:14:30 +08:00
retRows = 0
retErr = err
return
2021-03-08 15:46:51 +08:00
}
2021-03-12 14:22:09 +08:00
segInfo , err := s . GetSegmentInfo ( ctx , & datapb . GetSegmentInfoRequest {
2021-03-08 15:46:51 +08:00
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 ,
Timestamp : ts ,
2021-05-25 15:06:05 +08:00
SourceID : c . session . ServerID ,
2021-03-08 15:46:51 +08:00
} ,
SegmentIDs : [ ] typeutil . UniqueID { segID } ,
} )
if err != nil {
2021-05-26 20:14:30 +08:00
retRows = 0
retErr = err
return
2021-03-08 15:46:51 +08:00
}
2021-03-10 22:06:22 +08:00
if segInfo . Status . ErrorCode != commonpb . ErrorCode_Success {
2021-03-08 15:46:51 +08:00
return 0 , fmt . Errorf ( "GetSegmentInfo from data service failed, error = %s" , segInfo . Status . Reason )
}
if len ( segInfo . Infos ) != 1 {
log . Debug ( "get segment info empty" )
2021-05-26 20:14:30 +08:00
retRows = 0
retErr = nil
return
2021-03-08 15:46:51 +08:00
}
2021-03-13 17:05:36 +08:00
if ! isFromFlushedChan && segInfo . Infos [ 0 ] . State != commonpb . SegmentState_Flushed {
2021-03-08 15:46:51 +08:00
log . Debug ( "segment id not flushed" , zap . Int64 ( "segment id" , segID ) )
2021-05-26 20:14:30 +08:00
retRows = 0
retErr = nil
return
2021-03-08 15:46:51 +08:00
}
2021-06-04 11:45:45 +08:00
retRows = segInfo . Infos [ 0 ] . NumOfRows
2021-05-26 20:14:30 +08:00
retErr = nil
return
2021-03-08 15:46:51 +08:00
}
2021-07-03 14:36:18 +08:00
c . CallGetFlushedSegmentsService = func ( ctx context . Context , collID , partID typeutil . UniqueID ) ( retSegIDs [ ] typeutil . UniqueID , retErr error ) {
defer func ( ) {
if err := recover ( ) ; err != nil {
retSegIDs = [ ] typeutil . UniqueID { }
retErr = fmt . Errorf ( "get flushed segments from data coord panic, msg = %v" , err )
return
}
} ( )
<- initCh
req := & datapb . GetFlushedSegmentsRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO,msg type
MsgID : 0 ,
Timestamp : 0 ,
SourceID : c . session . ServerID ,
} ,
CollectionID : collID ,
PartitionID : partID ,
}
rsp , err := s . GetFlushedSegments ( ctx , req )
if err != nil {
retSegIDs = [ ] typeutil . UniqueID { }
retErr = err
return
}
if rsp . Status . ErrorCode != commonpb . ErrorCode_Success {
retSegIDs = [ ] typeutil . UniqueID { }
retErr = fmt . Errorf ( "get flushed segments from data coord failed, reason = %s" , rsp . Status . Reason )
return
}
retSegIDs = rsp . Segments
retErr = nil
return
}
2021-01-24 20:26:35 +08:00
return nil
}
2021-06-21 17:28:03 +08:00
func ( c * Core ) SetIndexCoord ( s types . IndexCoord ) error {
2021-06-25 16:48:10 +08:00
initCh := make ( chan struct { } )
go func ( ) {
for {
if err := s . Init ( ) ; err == nil {
if err := s . Start ( ) ; err == nil {
close ( initCh )
log . Debug ( "RootCoord connect to IndexCoord" )
return
}
}
log . Debug ( "RootCoord connect to IndexCoord, retry" )
}
} ( )
2021-05-26 20:14:30 +08:00
c . CallBuildIndexService = func ( ctx context . Context , binlog [ ] string , field * schemapb . FieldSchema , idxInfo * etcdpb . IndexInfo ) ( retID typeutil . UniqueID , retErr error ) {
defer func ( ) {
if err := recover ( ) ; err != nil {
retID = 0
retErr = fmt . Errorf ( "build index panic, msg = %v" , err )
return
}
} ( )
2021-06-25 16:48:10 +08:00
<- initCh
2021-02-26 17:44:24 +08:00
rsp , err := s . BuildIndex ( ctx , & indexpb . BuildIndexRequest {
2021-01-24 20:26:35 +08:00
DataPaths : binlog ,
2021-05-15 18:08:08 +08:00
TypeParams : field . TypeParams ,
IndexParams : idxInfo . IndexParams ,
IndexID : idxInfo . IndexID ,
IndexName : idxInfo . IndexName ,
2021-01-24 20:26:35 +08:00
} )
if err != nil {
2021-05-26 20:14:30 +08:00
retID = 0
retErr = err
return
2021-01-24 20:26:35 +08:00
}
2021-03-10 22:06:22 +08:00
if rsp . Status . ErrorCode != commonpb . ErrorCode_Success {
2021-05-26 20:14:30 +08:00
retID = 0
retErr = fmt . Errorf ( "BuildIndex from index service failed, error = %s" , rsp . Status . Reason )
return
2021-01-24 20:26:35 +08:00
}
2021-05-26 20:14:30 +08:00
retID = rsp . IndexBuildID
retErr = nil
return
2021-01-24 20:26:35 +08:00
}
2021-02-20 15:38:44 +08:00
2021-05-26 20:14:30 +08:00
c . CallDropIndexService = func ( ctx context . Context , indexID typeutil . UniqueID ) ( retErr error ) {
defer func ( ) {
if err := recover ( ) ; err != nil {
retErr = fmt . Errorf ( "drop index from index service panic, msg = %v" , err )
return
}
} ( )
2021-06-25 16:48:10 +08:00
<- initCh
2021-02-26 17:44:24 +08:00
rsp , err := s . DropIndex ( ctx , & indexpb . DropIndexRequest {
2021-02-20 15:38:44 +08:00
IndexID : indexID ,
} )
if err != nil {
2021-05-26 20:14:30 +08:00
retErr = err
return
2021-02-20 15:38:44 +08:00
}
2021-03-10 22:06:22 +08:00
if rsp . ErrorCode != commonpb . ErrorCode_Success {
2021-05-26 20:14:30 +08:00
retErr = fmt . Errorf ( rsp . Reason )
return
2021-02-20 15:38:44 +08:00
}
2021-05-26 20:14:30 +08:00
retErr = nil
return
2021-02-20 15:38:44 +08:00
}
2021-01-24 20:26:35 +08:00
return nil
}
2021-06-22 16:44:09 +08:00
func ( c * Core ) SetQueryCoord ( s types . QueryCoord ) error {
2021-06-25 16:48:10 +08:00
initCh := make ( chan struct { } )
go func ( ) {
for {
if err := s . Init ( ) ; err == nil {
if err := s . Start ( ) ; err == nil {
close ( initCh )
log . Debug ( "RootCoord connect to QueryCoord" )
return
}
}
log . Debug ( "RootCoord connect to QueryCoord, retry" )
}
} ( )
2021-05-26 20:14:30 +08:00
c . CallReleaseCollectionService = func ( ctx context . Context , ts typeutil . Timestamp , dbID typeutil . UniqueID , collectionID typeutil . UniqueID ) ( retErr error ) {
defer func ( ) {
if err := recover ( ) ; err != nil {
retErr = fmt . Errorf ( "release collection from query service panic, msg = %v" , err )
return
}
} ( )
2021-06-25 16:48:10 +08:00
<- initCh
2021-02-05 14:09:55 +08:00
req := & querypb . ReleaseCollectionRequest {
Base : & commonpb . MsgBase {
2021-03-10 14:45:35 +08:00
MsgType : commonpb . MsgType_ReleaseCollection ,
2021-02-05 14:09:55 +08:00
MsgID : 0 , //TODO, msg ID
Timestamp : ts ,
2021-05-25 15:06:05 +08:00
SourceID : c . session . ServerID ,
2021-02-05 14:09:55 +08:00
} ,
DbID : dbID ,
CollectionID : collectionID ,
}
2021-02-26 17:44:24 +08:00
rsp , err := s . ReleaseCollection ( ctx , req )
2021-02-05 14:09:55 +08:00
if err != nil {
2021-05-26 20:14:30 +08:00
retErr = err
return
2021-02-05 14:09:55 +08:00
}
2021-03-10 22:06:22 +08:00
if rsp . ErrorCode != commonpb . ErrorCode_Success {
2021-05-26 20:14:30 +08:00
retErr = fmt . Errorf ( "ReleaseCollection from query service failed, error = %s" , rsp . Reason )
return
2021-02-05 14:09:55 +08:00
}
2021-05-26 20:14:30 +08:00
retErr = nil
return
2021-02-05 14:09:55 +08:00
}
2021-06-22 16:08:08 +08:00
c . CallReleasePartitionService = func ( ctx context . Context , ts typeutil . Timestamp , dbID , collectionID typeutil . UniqueID , partitionIDs [ ] typeutil . UniqueID ) ( retErr error ) {
defer func ( ) {
if err := recover ( ) ; err != nil {
retErr = fmt . Errorf ( "release partition from query service panic, msg = %v" , err )
}
} ( )
2021-06-25 16:48:10 +08:00
<- initCh
2021-06-22 16:08:08 +08:00
req := & querypb . ReleasePartitionsRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ReleasePartitions ,
MsgID : 0 , //TODO, msg ID
Timestamp : ts ,
SourceID : c . session . ServerID ,
} ,
DbID : dbID ,
CollectionID : collectionID ,
PartitionIDs : partitionIDs ,
}
rsp , err := s . ReleasePartitions ( ctx , req )
if err != nil {
retErr = err
return
}
if rsp . ErrorCode != commonpb . ErrorCode_Success {
retErr = fmt . Errorf ( "ReleasePartitions from query service failed, error = %s" , rsp . Reason )
return
}
retErr = nil
return
}
2021-02-05 14:09:55 +08:00
return nil
}
2021-05-15 18:08:08 +08:00
// BuildIndex will check row num and call build index service
2021-06-30 16:18:13 +08:00
func ( c * Core ) BuildIndex ( ctx context . Context , segID typeutil . UniqueID , field * schemapb . FieldSchema , idxInfo * etcdpb . IndexInfo , isFlush bool ) ( typeutil . UniqueID , error ) {
sp , ctx := trace . StartSpanFromContext ( ctx )
defer sp . Finish ( )
2021-05-15 18:08:08 +08:00
if c . MetaTable . IsSegmentIndexed ( segID , field , idxInfo . IndexParams ) {
2021-05-24 14:19:52 +08:00
return 0 , nil
2021-05-15 18:08:08 +08:00
}
2021-06-30 16:18:13 +08:00
rows , err := c . CallGetNumRowsService ( ctx , segID , isFlush )
2021-05-15 18:08:08 +08:00
if err != nil {
2021-05-24 14:19:52 +08:00
return 0 , err
2021-05-15 18:08:08 +08:00
}
var bldID typeutil . UniqueID
if rows < Params . MinSegmentSizeToEnableIndex {
log . Debug ( "num of rows is less than MinSegmentSizeToEnableIndex" , zap . Int64 ( "num rows" , rows ) )
} else {
2021-06-30 16:18:13 +08:00
binlogs , err := c . CallGetBinlogFilePathsService ( ctx , segID , field . FieldID )
2021-05-15 18:08:08 +08:00
if err != nil {
2021-05-24 14:19:52 +08:00
return 0 , err
2021-05-15 18:08:08 +08:00
}
2021-06-30 16:18:13 +08:00
bldID , err = c . CallBuildIndexService ( ctx , binlogs , field , idxInfo )
2021-05-15 18:08:08 +08:00
if err != nil {
2021-05-24 14:19:52 +08:00
return 0 , err
2021-05-15 18:08:08 +08:00
}
}
2021-05-24 14:19:52 +08:00
log . Debug ( "build index" , zap . String ( "index name" , idxInfo . IndexName ) ,
zap . String ( "field name" , field . Name ) ,
zap . Int64 ( "segment id" , segID ) )
return bldID , nil
2021-05-15 18:08:08 +08:00
}
2021-06-17 16:47:57 +08:00
// Register register rootcoord at etcd
2021-05-25 15:06:05 +08:00
func ( c * Core ) Register ( ) error {
2021-06-11 22:04:41 +08:00
c . session = sessionutil . NewSession ( c . ctx , Params . MetaRootPath , Params . EtcdEndpoints )
2021-06-03 19:01:33 +08:00
if c . session == nil {
return fmt . Errorf ( "session is nil, maybe the etcd client connection fails" )
}
2021-06-17 16:47:57 +08:00
c . sessCloseCh = c . session . Init ( typeutil . RootCoordRole , Params . Address , true )
2021-05-25 15:06:05 +08:00
return nil
}
2021-01-24 20:26:35 +08:00
func ( c * Core ) Init ( ) error {
2021-01-19 14:44:03 +08:00
var initError error = nil
c . initOnce . Do ( func ( ) {
2021-02-26 15:17:47 +08:00
connectEtcdFn := func ( ) error {
2021-06-11 22:04:41 +08:00
if c . etcdCli , initError = clientv3 . New ( clientv3 . Config { Endpoints : Params . EtcdEndpoints , DialTimeout : 5 * time . Second } ) ; initError != nil {
2021-02-26 15:17:47 +08:00
return initError
}
2021-05-18 14:18:02 +08:00
tsAlloc := func ( ) typeutil . Timestamp {
for {
var ts typeutil . Timestamp
var err error
2021-05-20 14:14:14 +08:00
if ts , err = c . TSOAllocator ( 1 ) ; err == nil {
2021-05-18 14:18:02 +08:00
return ts
}
time . Sleep ( 100 * time . Millisecond )
log . Debug ( "alloc time stamp error" , zap . Error ( err ) )
}
}
var ms * metaSnapshot
ms , initError = newMetaSnapshot ( c . etcdCli , Params . MetaRootPath , TimestampPrefix , 1024 , tsAlloc )
if initError != nil {
return initError
}
if c . MetaTable , initError = NewMetaTable ( ms ) ; initError != nil {
2021-02-26 15:17:47 +08:00
return initError
}
c . kvBase = etcdkv . NewEtcdKV ( c . etcdCli , Params . KvRootPath )
return nil
2021-01-19 14:44:03 +08:00
}
2021-06-22 16:08:08 +08:00
log . Debug ( "RootCoord, Connect to Etcd" )
2021-06-23 09:24:10 +08:00
err := retry . Do ( c . ctx , connectEtcdFn , retry . Attempts ( 300 ) )
2021-02-26 15:17:47 +08:00
if err != nil {
2021-01-19 14:44:03 +08:00
return
}
2021-06-22 16:08:08 +08:00
log . Debug ( "RootCoord, Set TSO and ID Allocator" )
2021-06-11 22:04:41 +08:00
idAllocator := allocator . NewGlobalIDAllocator ( "idTimestamp" , tsoutil . NewTSOKVBase ( Params . EtcdEndpoints , Params . KvRootPath , "gid" ) )
2021-04-08 17:31:39 +08:00
if initError = idAllocator . Initialize ( ) ; initError != nil {
2021-01-19 14:44:03 +08:00
return
}
2021-05-20 14:14:14 +08:00
c . IDAllocator = func ( count uint32 ) ( typeutil . UniqueID , typeutil . UniqueID , error ) {
2021-04-08 17:31:39 +08:00
return idAllocator . Alloc ( count )
}
2021-05-20 14:14:14 +08:00
c . IDAllocatorUpdate = func ( ) error {
2021-04-08 17:31:39 +08:00
return idAllocator . UpdateID ( )
}
2021-06-11 22:04:41 +08:00
tsoAllocator := tso . NewGlobalTSOAllocator ( "timestamp" , tsoutil . NewTSOKVBase ( Params . EtcdEndpoints , Params . KvRootPath , "tso" ) )
2021-04-08 17:31:39 +08:00
if initError = tsoAllocator . Initialize ( ) ; initError != nil {
2021-01-19 14:44:03 +08:00
return
}
2021-05-20 14:14:14 +08:00
c . TSOAllocator = func ( count uint32 ) ( typeutil . Timestamp , error ) {
2021-04-08 17:31:39 +08:00
return tsoAllocator . Alloc ( count )
}
2021-05-20 14:14:14 +08:00
c . TSOAllocatorUpdate = func ( ) error {
2021-04-08 17:31:39 +08:00
return tsoAllocator . UpdateTSO ( )
}
2021-05-21 16:08:12 +08:00
m := map [ string ] interface { } {
"PulsarAddress" : Params . PulsarAddress ,
"ReceiveBufSize" : 1024 ,
"PulsarBufSize" : 1024 }
if initError = c . msFactory . SetParams ( m ) ; initError != nil {
return
}
2021-06-04 15:00:34 +08:00
c . dmlChannels = newDMLChannels ( c )
pc := c . MetaTable . ListCollectionPhysicalChannels ( )
2021-06-08 19:25:37 +08:00
c . dmlChannels . AddProducerChannels ( pc ... )
2021-06-04 15:00:34 +08:00
2021-05-26 20:14:30 +08:00
c . chanTimeTick = newTimeTickSync ( c )
2021-06-22 19:08:03 +08:00
c . chanTimeTick . AddProxy ( c . session )
2021-05-26 20:14:30 +08:00
c . proxyClientManager = newProxyClientManager ( c )
2021-06-22 16:08:08 +08:00
log . Debug ( "RootCoord, set proxy manager" )
2021-06-22 19:08:03 +08:00
c . proxyManager , initError = newProxyManager (
2021-05-26 20:14:30 +08:00
c . ctx ,
2021-06-11 22:04:41 +08:00
Params . EtcdEndpoints ,
2021-06-22 19:08:03 +08:00
c . chanTimeTick . GetProxy ,
2021-05-26 20:14:30 +08:00
c . proxyClientManager . GetProxyClients ,
)
2021-06-22 19:08:03 +08:00
c . proxyManager . AddSession ( c . chanTimeTick . AddProxy , c . proxyClientManager . AddProxyClient )
c . proxyManager . DelSession ( c . chanTimeTick . DelProxy , c . proxyClientManager . DelProxyClient )
2021-05-21 16:08:12 +08:00
2021-01-20 09:36:50 +08:00
initError = c . setMsgStreams ( )
2021-01-19 14:44:03 +08:00
} )
2021-01-26 19:24:09 +08:00
if initError == nil {
2021-06-17 16:47:57 +08:00
log . Debug ( typeutil . RootCoordRole , zap . String ( "State Code" , internalpb . StateCode_name [ int32 ( internalpb . StateCode_Initializing ) ] ) )
2021-06-22 16:08:08 +08:00
} else {
log . Debug ( "RootCoord init error" , zap . Error ( initError ) )
2021-01-26 19:24:09 +08:00
}
2021-01-19 14:44:03 +08:00
return initError
}
2021-05-14 21:26:06 +08:00
func ( c * Core ) reSendDdMsg ( ctx context . Context ) error {
2021-05-18 14:18:02 +08:00
flag , err := c . MetaTable . client . Load ( DDMsgSendPrefix , 0 )
2021-05-14 21:26:06 +08:00
if err != nil || flag == "true" {
log . Debug ( "No un-successful DdMsg" )
return nil
}
2021-05-18 14:18:02 +08:00
ddOpStr , err := c . MetaTable . client . Load ( DDOperationPrefix , 0 )
2021-05-14 21:26:06 +08:00
if err != nil {
log . Debug ( "DdOperation key does not exist" )
return nil
}
var ddOp DdOperation
if err = json . Unmarshal ( [ ] byte ( ddOpStr ) , & ddOp ) ; err != nil {
return err
}
switch ddOp . Type {
case CreateCollectionDDType :
2021-07-06 09:16:03 +08:00
var ddReq = internalpb . CreateCollectionRequest { }
if err = proto . UnmarshalText ( ddOp . Body , & ddReq ) ; err != nil {
2021-05-14 21:26:06 +08:00
return err
}
2021-07-06 09:16:03 +08:00
collInfo , err := c . MetaTable . GetCollectionByName ( ddReq . CollectionName , 0 )
2021-06-11 16:39:29 +08:00
if err != nil {
return err
}
2021-07-06 09:16:03 +08:00
if err = c . SendDdCreateCollectionReq ( ctx , & ddReq , collInfo . PhysicalChannelNames ) ; err != nil {
2021-05-14 21:26:06 +08:00
return err
}
case DropCollectionDDType :
var ddReq = internalpb . DropCollectionRequest { }
if err = proto . UnmarshalText ( ddOp . Body , & ddReq ) ; err != nil {
return err
}
2021-06-11 16:39:29 +08:00
collInfo , err := c . MetaTable . GetCollectionByName ( ddReq . CollectionName , 0 )
if err != nil {
return err
}
if err = c . SendDdDropCollectionReq ( ctx , & ddReq , collInfo . PhysicalChannelNames ) ; err != nil {
2021-05-14 21:26:06 +08:00
return err
}
2021-05-26 20:14:30 +08:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ddReq . Base . Timestamp ,
SourceID : c . session . ServerID ,
} ,
DbName : ddReq . DbName ,
CollectionName : ddReq . CollectionName ,
}
c . proxyClientManager . InvalidateCollectionMetaCache ( c . ctx , & req )
2021-05-14 21:26:06 +08:00
case CreatePartitionDDType :
var ddReq = internalpb . CreatePartitionRequest { }
if err = proto . UnmarshalText ( ddOp . Body , & ddReq ) ; err != nil {
return err
}
2021-06-11 16:39:29 +08:00
collInfo , err := c . MetaTable . GetCollectionByName ( ddReq . CollectionName , 0 )
if err != nil {
return err
}
if err = c . SendDdCreatePartitionReq ( ctx , & ddReq , collInfo . PhysicalChannelNames ) ; err != nil {
2021-05-14 21:26:06 +08:00
return err
}
2021-05-26 20:14:30 +08:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ddReq . Base . Timestamp ,
SourceID : c . session . ServerID ,
} ,
DbName : ddReq . DbName ,
CollectionName : ddReq . CollectionName ,
}
c . proxyClientManager . InvalidateCollectionMetaCache ( c . ctx , & req )
2021-05-14 21:26:06 +08:00
case DropPartitionDDType :
var ddReq = internalpb . DropPartitionRequest { }
if err = proto . UnmarshalText ( ddOp . Body , & ddReq ) ; err != nil {
return err
}
2021-06-11 16:39:29 +08:00
collInfo , err := c . MetaTable . GetCollectionByName ( ddReq . CollectionName , 0 )
if err != nil {
return err
}
if err = c . SendDdDropPartitionReq ( ctx , & ddReq , collInfo . PhysicalChannelNames ) ; err != nil {
2021-05-14 21:26:06 +08:00
return err
}
2021-05-26 20:14:30 +08:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ddReq . Base . Timestamp ,
SourceID : c . session . ServerID ,
} ,
DbName : ddReq . DbName ,
CollectionName : ddReq . CollectionName ,
}
c . proxyClientManager . InvalidateCollectionMetaCache ( c . ctx , & req )
2021-05-14 21:26:06 +08:00
default :
return fmt . Errorf ( "Invalid DdOperation %s" , ddOp . Type )
}
// Update DDOperation in etcd
return c . setDdMsgSendFlag ( true )
}
2021-01-19 14:44:03 +08:00
func ( c * Core ) Start ( ) error {
if err := c . checkInit ( ) ; err != nil {
2021-06-17 16:47:57 +08:00
log . Debug ( "RootCoord Start checkInit failed" , zap . Error ( err ) )
2021-01-19 14:44:03 +08:00
return err
}
2021-04-08 17:31:39 +08:00
2021-06-17 16:47:57 +08:00
log . Debug ( typeutil . RootCoordRole , zap . Int64 ( "node id" , c . session . ServerID ) )
log . Debug ( typeutil . RootCoordRole , zap . String ( "time tick channel name" , Params . TimeTickChannel ) )
2021-04-08 17:31:39 +08:00
2021-01-19 14:44:03 +08:00
c . startOnce . Do ( func ( ) {
2021-06-22 19:08:03 +08:00
if err := c . proxyManager . WatchProxy ( ) ; err != nil {
log . Debug ( "RootCoord Start WatchProxy failed" , zap . Error ( err ) )
2021-05-26 20:14:30 +08:00
return
}
2021-05-14 21:26:06 +08:00
if err := c . reSendDdMsg ( c . ctx ) ; err != nil {
2021-06-17 16:47:57 +08:00
log . Debug ( "RootCoord Start reSendDdMsg failed" , zap . Error ( err ) )
2021-05-14 21:26:06 +08:00
return
}
2021-01-19 14:44:03 +08:00
go c . startTimeTickLoop ( )
2021-01-27 16:38:18 +08:00
go c . tsLoop ( )
2021-05-26 20:14:30 +08:00
go c . sessionLoop ( )
2021-05-21 16:08:12 +08:00
go c . chanTimeTick . StartWatch ( )
2021-07-03 17:54:25 +08:00
go c . checkFlushedSegmentsLoop ( )
2021-03-12 14:22:09 +08:00
c . stateCode . Store ( internalpb . StateCode_Healthy )
2021-01-19 14:44:03 +08:00
} )
2021-06-17 16:47:57 +08:00
log . Debug ( typeutil . RootCoordRole , zap . String ( "State Code" , internalpb . StateCode_name [ int32 ( internalpb . StateCode_Healthy ) ] ) )
2021-01-19 14:44:03 +08:00
return nil
}
func ( c * Core ) Stop ( ) error {
c . cancel ( )
2021-03-12 14:22:09 +08:00
c . stateCode . Store ( internalpb . StateCode_Abnormal )
2021-01-19 14:44:03 +08:00
return nil
}
2021-03-12 14:22:09 +08:00
func ( c * Core ) GetComponentStates ( ctx context . Context ) ( * internalpb . ComponentStates , error ) {
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
log . Debug ( "GetComponentStates" , zap . String ( "State Code" , internalpb . StateCode_name [ int32 ( code ) ] ) )
2021-01-26 19:24:09 +08:00
2021-03-12 14:22:09 +08:00
return & internalpb . ComponentStates {
State : & internalpb . ComponentInfo {
2021-05-25 15:06:05 +08:00
NodeID : c . session . ServerID ,
2021-06-17 16:47:57 +08:00
Role : typeutil . RootCoordRole ,
2021-01-20 11:02:29 +08:00
StateCode : code ,
ExtraInfo : nil ,
2021-01-19 14:44:03 +08:00
} ,
2021-01-26 17:47:38 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-26 17:47:38 +08:00
Reason : "" ,
} ,
2021-03-12 14:22:09 +08:00
SubcomponentStates : [ ] * internalpb . ComponentInfo {
2021-01-26 17:47:38 +08:00
{
2021-05-25 15:06:05 +08:00
NodeID : c . session . ServerID ,
2021-06-17 16:47:57 +08:00
Role : typeutil . RootCoordRole ,
2021-01-26 17:47:38 +08:00
StateCode : code ,
ExtraInfo : nil ,
} ,
} ,
2021-01-19 14:44:03 +08:00
} , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) GetTimeTickChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
return & milvuspb . StringResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-02-26 17:44:24 +08:00
Reason : "" ,
} ,
Value : Params . TimeTickChannel ,
} , nil
2021-01-19 14:44:03 +08:00
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) GetStatisticsChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
return & milvuspb . StringResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-02-26 17:44:24 +08:00
Reason : "" ,
} ,
Value : Params . StatisticsChannel ,
} , nil
2021-01-19 14:44:03 +08:00
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) CreateCollection ( ctx context . Context , in * milvuspb . CreateCollectionRequest ) ( * commonpb . Status , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordCreateCollectionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "CreateCollection " , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
t := & CreateCollectionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "CreateCollection failed" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "Create collection failed: " + err . Error ( ) ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "CreateCollection Success" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordCreateCollectionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
} , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) DropCollection ( ctx context . Context , in * milvuspb . DropCollectionRequest ) ( * commonpb . Status , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordDropCollectionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DropCollection" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
t := & DropCollectionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "DropCollection Failed" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-02-05 11:49:13 +08:00
Reason : "Drop collection failed: " + err . Error ( ) ,
2021-01-19 14:44:03 +08:00
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DropCollection Success" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordDropCollectionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
} , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) HasCollection ( ctx context . Context , in * milvuspb . HasCollectionRequest ) ( * milvuspb . BoolResponse , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordHasCollectionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & milvuspb . BoolResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} ,
Value : false ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "HasCollection" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
t := & HasCollectionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
HasCollection : false ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "HasCollection Failed" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
return & milvuspb . BoolResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "Has collection failed: " + err . Error ( ) ,
} ,
Value : false ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "HasCollection Success" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordHasCollectionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
return & milvuspb . BoolResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
} ,
Value : t . HasCollection ,
} , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) DescribeCollection ( ctx context . Context , in * milvuspb . DescribeCollectionRequest ) ( * milvuspb . DescribeCollectionResponse , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordDescribeCollectionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & milvuspb . DescribeCollectionResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} ,
Schema : nil ,
CollectionID : 0 ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeCollection" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
t := & DescribeCollectionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
Rsp : & milvuspb . DescribeCollectionResponse { } ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeCollection Failed" , zap . String ( "name" , in . CollectionName ) , zap . Error ( err ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
return & milvuspb . DescribeCollectionResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "describe collection failed: " + err . Error ( ) ,
} ,
Schema : nil ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeCollection Success" , zap . String ( "name" , in . CollectionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordDescribeCollectionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
t . Rsp . Status = & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
}
2021-06-08 19:25:37 +08:00
// log.Debug("describe collection", zap.Any("schema", t.Rsp.Schema))
2021-01-19 14:44:03 +08:00
return t . Rsp , nil
}
2021-03-12 14:22:09 +08:00
func ( c * Core ) ShowCollections ( ctx context . Context , in * milvuspb . ShowCollectionsRequest ) ( * milvuspb . ShowCollectionsResponse , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordShowCollectionsCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
return & milvuspb . ShowCollectionsResponse {
2021-01-25 18:33:10 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} ,
CollectionNames : nil ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "ShowCollections" , zap . String ( "dbname" , in . DbName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
t := & ShowCollectionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
2021-03-12 14:22:09 +08:00
Rsp : & milvuspb . ShowCollectionsResponse {
2021-01-19 14:44:03 +08:00
CollectionNames : nil ,
2021-06-03 19:09:33 +08:00
CollectionIds : nil ,
2021-01-19 14:44:03 +08:00
} ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "ShowCollections failed" , zap . String ( "dbname" , in . DbName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-03-12 14:22:09 +08:00
return & milvuspb . ShowCollectionsResponse {
2021-01-19 14:44:03 +08:00
CollectionNames : nil ,
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "ShowCollections failed: " + err . Error ( ) ,
} ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "ShowCollections Success" , zap . String ( "dbname" , in . DbName ) , zap . Strings ( "collection names" , t . Rsp . CollectionNames ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordShowCollectionsCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
t . Rsp . Status = & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
}
return t . Rsp , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) CreatePartition ( ctx context . Context , in * milvuspb . CreatePartitionRequest ) ( * commonpb . Status , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordCreatePartitionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "CreatePartition" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
t := & CreatePartitionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "CreatePartition Failed" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "create partition failed: " + err . Error ( ) ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "CreatePartition Success" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordCreatePartitionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
} , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) DropPartition ( ctx context . Context , in * milvuspb . DropPartitionRequest ) ( * commonpb . Status , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordDropPartitionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DropPartition" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
t := & DropPartitionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "DropPartition Failed" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "DropPartition failed: " + err . Error ( ) ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DropPartition Success" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordDropPartitionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
} , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) HasPartition ( ctx context . Context , in * milvuspb . HasPartitionRequest ) ( * milvuspb . BoolResponse , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordHasPartitionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & milvuspb . BoolResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} ,
Value : false ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "HasPartition" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
t := & HasPartitionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
HasPartition : false ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "HasPartition Failed" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-19 14:44:03 +08:00
return & milvuspb . BoolResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "HasPartition failed: " + err . Error ( ) ,
} ,
Value : false ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "HasPartition Success" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "partition name" , in . PartitionName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordHasPartitionCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
return & milvuspb . BoolResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
} ,
Value : t . HasPartition ,
} , nil
}
2021-03-12 14:22:09 +08:00
func ( c * Core ) ShowPartitions ( ctx context . Context , in * milvuspb . ShowPartitionsRequest ) ( * milvuspb . ShowPartitionsResponse , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordShowPartitionsCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-13 11:59:24 +08:00
log . Debug ( "ShowPartitionRequest received" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) ,
zap . String ( "collection" , in . CollectionName ) )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-06-17 16:47:57 +08:00
log . Debug ( "ShowPartitionRequest failed: rootcoord is not healthy" , zap . String ( "role" , Params . RoleName ) ,
2021-03-13 11:59:24 +08:00
zap . Int64 ( "msgID" , in . Base . MsgID ) , zap . String ( "state" , internalpb . StateCode_name [ int32 ( code ) ] ) )
2021-03-12 14:22:09 +08:00
return & milvuspb . ShowPartitionsResponse {
2021-01-25 18:33:10 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-06-17 16:47:57 +08:00
Reason : fmt . Sprintf ( "rootcoord is not healthy, state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} ,
PartitionNames : nil ,
PartitionIDs : nil ,
} , nil
}
2021-01-19 14:44:03 +08:00
t := & ShowPartitionReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-19 14:44:03 +08:00
core : c ,
} ,
Req : in ,
2021-03-12 14:22:09 +08:00
Rsp : & milvuspb . ShowPartitionsResponse {
2021-01-19 14:44:03 +08:00
PartitionNames : nil ,
Status : nil ,
} ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-04-09 16:10:12 +08:00
log . Debug ( "ShowPartitionsRequest failed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) , zap . Error ( err ) )
2021-03-12 14:22:09 +08:00
return & milvuspb . ShowPartitionsResponse {
2021-01-19 14:44:03 +08:00
PartitionNames : nil ,
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-13 11:59:24 +08:00
Reason : err . Error ( ) ,
2021-01-19 14:44:03 +08:00
} ,
} , nil
}
2021-03-13 11:59:24 +08:00
log . Debug ( "ShowPartitions succeed" , zap . String ( "role" , Params . RoleName ) , zap . Int64 ( "msgID" , t . Req . Base . MsgID ) ,
zap . String ( "collection name" , in . CollectionName ) , zap . Strings ( "partition names" , t . Rsp . PartitionNames ) ,
zap . Int64s ( "partition ids" , t . Rsp . PartitionIDs ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordShowPartitionsCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-19 14:44:03 +08:00
t . Rsp . Status = & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
}
return t . Rsp , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) CreateIndex ( ctx context . Context , in * milvuspb . CreateIndexRequest ) ( * commonpb . Status , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordCreateIndexCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "CreateIndex" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "field name" , in . FieldName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-21 10:01:29 +08:00
t := & CreateIndexReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-21 10:01:29 +08:00
core : c ,
} ,
Req : in ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-21 10:01:29 +08:00
if err != nil {
2021-06-26 14:02:12 +08:00
log . Debug ( "CreateIndex Failed" , zap . String ( "collection name" , in . CollectionName ) ,
zap . String ( "field name" , in . FieldName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) ,
zap . Error ( err ) )
2021-01-21 10:01:29 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-21 10:01:29 +08:00
Reason : "CreateIndex failed, error = " + err . Error ( ) ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "CreateIndex Success" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "field name" , in . FieldName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordCreateIndexCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-21 10:01:29 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-21 10:01:29 +08:00
Reason : "" ,
} , nil
2021-01-19 14:44:03 +08:00
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) DescribeIndex ( ctx context . Context , in * milvuspb . DescribeIndexRequest ) ( * milvuspb . DescribeIndexResponse , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordDescribeIndexCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & milvuspb . DescribeIndexResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} ,
IndexDescriptions : nil ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeIndex" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "field name" , in . FieldName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-21 10:01:29 +08:00
t := & DescribeIndexReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-21 10:01:29 +08:00
core : c ,
} ,
Req : in ,
Rsp : & milvuspb . DescribeIndexResponse {
Status : nil ,
IndexDescriptions : nil ,
} ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-21 10:01:29 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeIndex Failed" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "field name" , in . FieldName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-21 10:01:29 +08:00
return & milvuspb . DescribeIndexResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-21 10:01:29 +08:00
Reason : "DescribeIndex failed, error = " + err . Error ( ) ,
} ,
IndexDescriptions : nil ,
} , nil
}
2021-02-24 16:25:40 +08:00
idxNames := make ( [ ] string , 0 , len ( t . Rsp . IndexDescriptions ) )
for _ , i := range t . Rsp . IndexDescriptions {
idxNames = append ( idxNames , i . IndexName )
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeIndex Success" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "field name" , in . FieldName ) , zap . Strings ( "index names" , idxNames ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordDescribeIndexCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-03-05 20:41:34 +08:00
if len ( t . Rsp . IndexDescriptions ) == 0 {
t . Rsp . Status = & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_IndexNotExist ,
2021-03-05 20:41:34 +08:00
Reason : "index not exist" ,
}
} else {
t . Rsp . Status = & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-03-05 20:41:34 +08:00
Reason : "" ,
}
2021-01-21 10:01:29 +08:00
}
return t . Rsp , nil
2021-01-19 14:44:03 +08:00
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) DropIndex ( ctx context . Context , in * milvuspb . DropIndexRequest ) ( * commonpb . Status , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordDropIndexCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-02-20 15:38:44 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-02-20 15:38:44 +08:00
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DropIndex" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "field name" , in . FieldName ) , zap . String ( "index name" , in . IndexName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-02-20 15:38:44 +08:00
t := & DropIndexReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-02-20 15:38:44 +08:00
core : c ,
} ,
Req : in ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-02-20 15:38:44 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "DropIndex Failed" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "field name" , in . FieldName ) , zap . String ( "index name" , in . IndexName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-02-20 15:38:44 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-02-26 11:07:25 +08:00
Reason : "DropIndex failed, error = " + err . Error ( ) ,
2021-02-20 15:38:44 +08:00
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DropIndex Success" , zap . String ( "collection name" , in . CollectionName ) , zap . String ( "field name" , in . FieldName ) , zap . String ( "index name" , in . IndexName ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordDropIndexCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-02-20 15:38:44 +08:00
return & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-02-20 15:38:44 +08:00
Reason : "" ,
} , nil
}
2021-02-26 17:44:24 +08:00
func ( c * Core ) DescribeSegment ( ctx context . Context , in * milvuspb . DescribeSegmentRequest ) ( * milvuspb . DescribeSegmentResponse , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordDescribeSegmentCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-01-25 18:33:10 +08:00
return & milvuspb . DescribeSegmentResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} ,
IndexID : 0 ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeSegment" , zap . Int64 ( "collection id" , in . CollectionID ) , zap . Int64 ( "segment id" , in . SegmentID ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-21 10:01:29 +08:00
t := & DescribeSegmentReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-21 10:01:29 +08:00
core : c ,
} ,
Req : in ,
Rsp : & milvuspb . DescribeSegmentResponse {
Status : nil ,
IndexID : 0 ,
} ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-21 10:01:29 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeSegment Failed" , zap . Int64 ( "collection id" , in . CollectionID ) , zap . Int64 ( "segment id" , in . SegmentID ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-21 10:01:29 +08:00
return & milvuspb . DescribeSegmentResponse {
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-21 10:01:29 +08:00
Reason : "DescribeSegment failed, error = " + err . Error ( ) ,
} ,
IndexID : 0 ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "DescribeSegment Success" , zap . Int64 ( "collection id" , in . CollectionID ) , zap . Int64 ( "segment id" , in . SegmentID ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordDescribeSegmentCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-21 10:01:29 +08:00
t . Rsp . Status = & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-21 10:01:29 +08:00
Reason : "" ,
}
return t . Rsp , nil
2021-01-19 14:44:03 +08:00
}
2021-03-12 14:22:09 +08:00
func ( c * Core ) ShowSegments ( ctx context . Context , in * milvuspb . ShowSegmentsRequest ) ( * milvuspb . ShowSegmentsResponse , error ) {
2021-06-22 19:08:03 +08:00
metrics . RootCoordShowSegmentsCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsTotal ) . Inc ( )
2021-03-12 14:22:09 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
return & milvuspb . ShowSegmentsResponse {
2021-01-25 18:33:10 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-03-12 14:22:09 +08:00
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
2021-01-25 18:33:10 +08:00
} ,
SegmentIDs : nil ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "ShowSegments" , zap . Int64 ( "collection id" , in . CollectionID ) , zap . Int64 ( "partition id" , in . PartitionID ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-01-21 10:01:29 +08:00
t := & ShowSegmentReqTask {
baseReqTask : baseReqTask {
2021-03-13 14:42:53 +08:00
ctx : ctx ,
2021-01-21 10:01:29 +08:00
core : c ,
} ,
Req : in ,
2021-03-12 14:22:09 +08:00
Rsp : & milvuspb . ShowSegmentsResponse {
2021-01-21 10:01:29 +08:00
Status : nil ,
SegmentIDs : nil ,
} ,
}
2021-06-26 09:22:11 +08:00
err := executeTask ( t )
2021-01-21 10:01:29 +08:00
if err != nil {
2021-03-26 15:13:33 +08:00
log . Debug ( "ShowSegments Failed" , zap . Int64 ( "collection id" , in . CollectionID ) , zap . Int64 ( "partition id" , in . PartitionID ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-03-12 14:22:09 +08:00
return & milvuspb . ShowSegmentsResponse {
2021-01-21 10:01:29 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-21 10:01:29 +08:00
Reason : "ShowSegments failed, error: " + err . Error ( ) ,
} ,
SegmentIDs : nil ,
} , nil
}
2021-03-26 15:13:33 +08:00
log . Debug ( "ShowSegments Success" , zap . Int64 ( "collection id" , in . CollectionID ) , zap . Int64 ( "partition id" , in . PartitionID ) , zap . Int64s ( "segments ids" , t . Rsp . SegmentIDs ) , zap . Int64 ( "msgID" , in . Base . MsgID ) )
2021-06-22 19:08:03 +08:00
metrics . RootCoordShowSegmentsCounter . WithLabelValues ( metricProxy ( in . Base . SourceID ) , MetricRequestsSuccess ) . Inc ( )
2021-01-21 10:01:29 +08:00
t . Rsp . Status = & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-21 10:01:29 +08:00
Reason : "" ,
}
return t . Rsp , nil
2021-01-19 14:44:03 +08:00
}
2021-06-22 16:14:09 +08:00
func ( c * Core ) AllocTimestamp ( ctx context . Context , in * rootcoordpb . AllocTimestampRequest ) ( * rootcoordpb . AllocTimestampResponse , error ) {
2021-05-26 20:14:30 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-06-22 16:14:09 +08:00
return & rootcoordpb . AllocTimestampResponse {
2021-05-26 20:14:30 +08:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
} ,
Timestamp : 0 ,
Count : 0 ,
} , nil
}
2021-05-20 14:14:14 +08:00
ts , err := c . TSOAllocator ( in . Count )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-04-01 18:05:43 +08:00
log . Debug ( "AllocTimestamp failed" , zap . Int64 ( "msgID" , in . Base . MsgID ) , zap . Error ( err ) )
2021-06-22 16:14:09 +08:00
return & rootcoordpb . AllocTimestampResponse {
2021-01-19 14:44:03 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "AllocTimestamp failed: " + err . Error ( ) ,
} ,
Timestamp : 0 ,
Count : 0 ,
} , nil
}
2021-02-02 10:58:39 +08:00
// log.Printf("AllocTimestamp : %d", ts)
2021-06-22 16:14:09 +08:00
return & rootcoordpb . AllocTimestampResponse {
2021-01-19 14:44:03 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
} ,
Timestamp : ts ,
Count : in . Count ,
} , nil
}
2021-06-22 16:14:09 +08:00
func ( c * Core ) AllocID ( ctx context . Context , in * rootcoordpb . AllocIDRequest ) ( * rootcoordpb . AllocIDResponse , error ) {
2021-05-26 20:14:30 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
2021-06-22 16:14:09 +08:00
return & rootcoordpb . AllocIDResponse {
2021-05-26 20:14:30 +08:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
} ,
ID : 0 ,
Count : 0 ,
} , nil
}
2021-05-20 14:14:14 +08:00
start , _ , err := c . IDAllocator ( in . Count )
2021-01-19 14:44:03 +08:00
if err != nil {
2021-04-01 18:05:43 +08:00
log . Debug ( "AllocID failed" , zap . Int64 ( "msgID" , in . Base . MsgID ) , zap . Error ( err ) )
2021-06-22 16:14:09 +08:00
return & rootcoordpb . AllocIDResponse {
2021-01-19 14:44:03 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-01-19 14:44:03 +08:00
Reason : "AllocID failed: " + err . Error ( ) ,
} ,
ID : 0 ,
Count : in . Count ,
} , nil
}
2021-02-24 16:25:40 +08:00
log . Debug ( "AllocID" , zap . Int64 ( "id start" , start ) , zap . Uint32 ( "count" , in . Count ) )
2021-06-22 16:14:09 +08:00
return & rootcoordpb . AllocIDResponse {
2021-01-19 14:44:03 +08:00
Status : & commonpb . Status {
2021-03-10 22:06:22 +08:00
ErrorCode : commonpb . ErrorCode_Success ,
2021-01-19 14:44:03 +08:00
Reason : "" ,
} ,
ID : start ,
Count : in . Count ,
} , nil
}
2021-05-21 16:08:12 +08:00
// UpdateChannelTimeTick used to handle ChannelTimeTickMsg
func ( c * Core ) UpdateChannelTimeTick ( ctx context . Context , in * internalpb . ChannelTimeTickMsg ) ( * commonpb . Status , error ) {
2021-05-26 20:14:30 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
} , nil
}
2021-05-21 16:08:12 +08:00
status := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
}
if in . Base . MsgType != commonpb . MsgType_TimeTick {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = fmt . Sprintf ( "UpdateChannelTimeTick receive invalid message %d" , in . Base . GetMsgType ( ) )
return status , nil
}
err := c . chanTimeTick . UpdateTimeTick ( in )
if err != nil {
status . ErrorCode = commonpb . ErrorCode_UnexpectedError
status . Reason = err . Error ( )
return status , nil
}
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} , nil
}
2021-06-17 17:45:56 +08:00
func ( c * Core ) ReleaseDQLMessageStream ( ctx context . Context , in * proxypb . ReleaseDQLMessageStreamRequest ) ( * commonpb . Status , error ) {
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
} , nil
}
return c . proxyClientManager . ReleaseDQLMessageStream ( ctx , in )
}
2021-07-01 14:58:17 +08:00
2021-07-02 11:16:20 +08:00
func ( c * Core ) SegmentFlushCompleted ( ctx context . Context , in * datapb . SegmentFlushCompletedMsg ) ( * commonpb . Status , error ) {
2021-07-01 14:58:17 +08:00
code := c . stateCode . Load ( ) . ( internalpb . StateCode )
if code != internalpb . StateCode_Healthy {
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : fmt . Sprintf ( "state code = %s" , internalpb . StateCode_name [ int32 ( code ) ] ) ,
} , nil
}
if in . Base . MsgType != commonpb . MsgType_SegmentFlushDone {
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : fmt . Sprintf ( "SegmentFlushDone with incorrect msgtype = %s" , commonpb . MsgType_name [ int32 ( in . Base . MsgType ) ] ) ,
} , nil
}
2021-07-02 11:16:20 +08:00
segID := in . Segment . GetID ( )
2021-07-01 14:58:17 +08:00
log . Debug ( "flush segment" , zap . Int64 ( "id" , segID ) )
2021-07-03 14:36:18 +08:00
coll , err := c . MetaTable . GetCollectionByID ( in . Segment . CollectionID , 0 )
2021-07-01 14:58:17 +08:00
if err != nil {
2021-07-03 14:36:18 +08:00
log . Warn ( "GetCollectionByID error" , zap . Error ( err ) )
2021-07-01 14:58:17 +08:00
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : fmt . Sprintf ( "GetCollectionBySegmentID error = %v" , err ) ,
} , nil
}
if len ( coll . FieldIndexes ) == 0 {
log . Debug ( "no index params on collection" , zap . String ( "collection_name" , coll . Schema . Name ) )
}
for _ , f := range coll . FieldIndexes {
fieldSch , err := GetFieldSchemaByID ( coll , f . FiledID )
if err != nil {
log . Warn ( "field schema not found" , zap . Int64 ( "field id" , f . FiledID ) )
continue
}
idxInfo , err := c . MetaTable . GetIndexByID ( f . IndexID )
if err != nil {
log . Warn ( "index not found" , zap . Int64 ( "index id" , f . IndexID ) )
continue
}
info := etcdpb . SegmentIndexInfo {
2021-07-05 10:08:02 +08:00
CollectionID : in . Segment . CollectionID ,
PartitionID : in . Segment . PartitionID ,
SegmentID : segID ,
FieldID : fieldSch . FieldID ,
IndexID : idxInfo . IndexID ,
EnableIndex : false ,
2021-07-01 14:58:17 +08:00
}
info . BuildID , err = c . BuildIndex ( ctx , segID , fieldSch , idxInfo , true )
if err == nil && info . BuildID != 0 {
info . EnableIndex = true
} else {
log . Error ( "build index fail" , zap . Int64 ( "buildid" , info . BuildID ) , zap . Error ( err ) )
2021-07-03 14:36:18 +08:00
continue
2021-07-01 14:58:17 +08:00
}
2021-07-05 10:08:02 +08:00
_ , err = c . MetaTable . AddIndex ( & info )
2021-07-01 14:58:17 +08:00
if err != nil {
log . Error ( "AddIndex fail" , zap . String ( "err" , err . Error ( ) ) )
}
}
return & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} , nil
}