2021-05-28 09:55:21 +08:00
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
2021-04-19 11:35:38 +08:00
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
2021-05-20 11:34:45 +08:00
2021-06-22 10:42:07 +08:00
package datacoord
2021-01-19 12:10:49 +08:00
2021-01-22 11:07:07 +08:00
import (
"context"
"fmt"
2021-03-08 15:25:55 +08:00
"math/rand"
2021-01-23 20:22:59 +08:00
"sync"
2021-01-26 15:14:49 +08:00
"sync/atomic"
"time"
2021-05-28 09:55:21 +08:00
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
2021-06-18 21:30:08 +08:00
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
2021-05-28 09:55:21 +08:00
"github.com/milvus-io/milvus/internal/logutil"
2021-05-27 18:45:24 +08:00
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
2021-03-04 16:01:30 +08:00
2021-04-22 14:45:57 +08:00
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
2021-05-21 19:28:52 +08:00
"github.com/milvus-io/milvus/internal/util/sessionutil"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-03-05 20:41:34 +08:00
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-01-22 11:07:07 +08:00
)
2021-06-22 18:24:08 +08:00
const (
rootCoordClientTimout = 20 * time . Second
connEtcdMaxRetryTime = 100000
connEtcdRetryInterval = 200 * time . Millisecond
)
2021-05-28 09:55:21 +08:00
2021-01-22 11:07:07 +08:00
type (
2021-01-22 19:43:27 +08:00
UniqueID = typeutil . UniqueID
Timestamp = typeutil . Timestamp
2021-01-22 11:07:07 +08:00
)
2021-06-22 18:24:08 +08:00
2021-06-29 10:46:13 +08:00
// ServerState type alias
type ServerState = int64
const (
// ServerStateStopped state stands for just created or stopped `Server` instance
ServerStateStopped ServerState = 0
// ServerStateInitializing state stands initializing `Server` instance
ServerStateInitializing ServerState = 1
// ServerStateHealthy state stands for healthy `Server` instance
ServerStateHealthy ServerState = 2
)
2021-06-24 19:05:06 +08:00
type dataNodeCreatorFunc func ( ctx context . Context , addr string ) ( types . DataNode , error )
type rootCoordCreatorFunc func ( ctx context . Context , metaRootPath string , etcdEndpoints [ ] string ) ( types . RootCoord , error )
2021-06-22 18:24:08 +08:00
2021-06-29 10:46:13 +08:00
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
2021-03-05 20:41:34 +08:00
type Server struct {
2021-04-13 09:47:02 +08:00
ctx context . Context
serverLoopCtx context . Context
serverLoopCancel context . CancelFunc
serverLoopWg sync . WaitGroup
2021-06-29 10:46:13 +08:00
isServing ServerState
2021-05-26 19:06:56 +08:00
2021-07-06 09:24:05 +08:00
kvClient * etcdkv . EtcdKV
meta * meta
segmentManager Manager
allocator allocator
2021-07-12 11:03:52 +08:00
cluster * Cluster
2021-07-06 09:24:05 +08:00
rootCoordClient types . RootCoord
ddChannelName string
2021-05-26 19:06:56 +08:00
2021-07-02 11:16:20 +08:00
flushCh chan UniqueID
msFactory msgstream . Factory
2021-05-26 19:06:56 +08:00
session * sessionutil . Session
activeCh <- chan bool
2021-05-28 09:55:21 +08:00
eventCh <- chan * sessionutil . SessionEvent
2021-05-26 19:06:56 +08:00
2021-06-22 18:24:08 +08:00
dataClientCreator dataNodeCreatorFunc
rootCoordClientCreator rootCoordCreatorFunc
2021-03-05 20:41:34 +08:00
}
2021-01-22 11:07:07 +08:00
2021-06-29 10:46:13 +08:00
// CreateServer create `Server` instance
2021-02-08 14:30:54 +08:00
func CreateServer ( ctx context . Context , factory msgstream . Factory ) ( * Server , error ) {
2021-03-08 15:25:55 +08:00
rand . Seed ( time . Now ( ) . UnixNano ( ) )
2021-01-26 15:14:49 +08:00
s := & Server {
2021-06-22 18:24:08 +08:00
ctx : ctx ,
msFactory : factory ,
flushCh : make ( chan UniqueID , 1024 ) ,
dataClientCreator : defaultDataNodeCreatorFunc ,
rootCoordClientCreator : defaultRootCoordCreatorFunc ,
2021-01-26 15:14:49 +08:00
}
return s , nil
}
2021-06-24 19:05:06 +08:00
func defaultDataNodeCreatorFunc ( ctx context . Context , addr string ) ( types . DataNode , error ) {
return datanodeclient . NewClient ( ctx , addr )
2021-06-22 18:24:08 +08:00
}
2021-06-24 19:05:06 +08:00
func defaultRootCoordCreatorFunc ( ctx context . Context , metaRootPath string , etcdEndpoints [ ] string ) ( types . RootCoord , error ) {
return rootcoordclient . NewClient ( ctx , metaRootPath , etcdEndpoints )
2021-06-22 18:24:08 +08:00
}
2021-05-25 15:06:05 +08:00
// Register register data service at etcd
func ( s * Server ) Register ( ) error {
2021-06-11 22:04:41 +08:00
s . session = sessionutil . NewSession ( s . ctx , Params . MetaRootPath , Params . EtcdEndpoints )
2021-06-21 11:40:15 +08:00
s . activeCh = s . session . Init ( typeutil . DataCoordRole , Params . IP , true )
2021-05-25 15:06:05 +08:00
Params . NodeID = s . session . ServerID
return nil
}
2021-06-29 10:46:13 +08:00
// Init change server state to Initializing
2021-05-25 15:06:05 +08:00
func ( s * Server ) Init ( ) error {
2021-06-29 10:46:13 +08:00
atomic . StoreInt64 ( & s . isServing , ServerStateInitializing )
2021-01-22 19:43:27 +08:00
return nil
}
2021-06-29 10:46:13 +08:00
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
// allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
// datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
2021-01-22 19:43:27 +08:00
func ( s * Server ) Start ( ) error {
2021-01-26 09:43:41 +08:00
var err error
2021-05-28 09:55:21 +08:00
m := map [ string ] interface { } {
"PulsarAddress" : Params . PulsarAddress ,
"ReceiveBufSize" : 1024 ,
"PulsarBufSize" : 1024 }
err = s . msFactory . SetParams ( m )
if err != nil {
return err
}
2021-06-21 17:28:03 +08:00
if err = s . initRootCoordClient ( ) ; err != nil {
2021-05-28 09:55:21 +08:00
return err
}
2021-05-26 19:06:56 +08:00
2021-05-28 09:55:21 +08:00
if err = s . initMeta ( ) ; err != nil {
return err
}
2021-05-26 19:06:56 +08:00
2021-05-28 09:55:21 +08:00
if err = s . initCluster ( ) ; err != nil {
return err
}
2021-05-26 19:06:56 +08:00
2021-06-22 18:24:08 +08:00
s . allocator = newRootCoordAllocator ( s . ctx , s . rootCoordClient )
2021-05-26 19:06:56 +08:00
2021-06-03 19:06:33 +08:00
s . startSegmentManager ( )
2021-05-28 09:55:21 +08:00
if err = s . initServiceDiscovery ( ) ; err != nil {
return err
}
2021-05-26 19:06:56 +08:00
2021-05-28 09:55:21 +08:00
s . startServerLoop ( )
2021-05-26 19:06:56 +08:00
2021-06-29 10:46:13 +08:00
atomic . StoreInt64 ( & s . isServing , ServerStateHealthy )
2021-06-22 18:24:08 +08:00
log . Debug ( "DataCoordinator startup success" )
2021-05-28 09:55:21 +08:00
return nil
2021-05-26 19:06:56 +08:00
}
func ( s * Server ) initCluster ( ) error {
2021-07-12 11:03:52 +08:00
var err error
s . cluster , err = NewCluster ( s . ctx , s . kvClient , NewNodesInfo ( ) , s )
return err
2021-05-26 19:06:56 +08:00
}
2021-02-08 14:30:54 +08:00
2021-05-26 19:06:56 +08:00
func ( s * Server ) initServiceDiscovery ( ) error {
sessions , rev , err := s . session . GetSessions ( typeutil . DataNodeRole )
if err != nil {
2021-06-21 11:40:15 +08:00
log . Debug ( "DataCoord initMeta failed" , zap . Error ( err ) )
2021-05-19 18:36:05 +08:00
return err
}
2021-05-26 19:06:56 +08:00
log . Debug ( "registered sessions" , zap . Any ( "sessions" , sessions ) )
2021-07-12 11:03:52 +08:00
datanodes := make ( [ ] * NodeInfo , 0 , len ( sessions ) )
2021-05-26 19:06:56 +08:00
for _ , session := range sessions {
2021-07-12 11:03:52 +08:00
info := & datapb . DataNodeInfo {
2021-05-26 19:06:56 +08:00
Address : session . Address ,
Version : session . ServerID ,
Channels : [ ] * datapb . ChannelStatus { } ,
2021-07-12 11:03:52 +08:00
}
nodeInfo := NewNodeInfo ( s . ctx , info )
datanodes = append ( datanodes , nodeInfo )
2021-05-26 19:06:56 +08:00
}
2021-05-19 18:36:05 +08:00
2021-07-12 11:03:52 +08:00
s . cluster . Startup ( datanodes )
2021-05-26 19:06:56 +08:00
2021-06-29 14:52:14 +08:00
s . eventCh = s . session . WatchServices ( typeutil . DataNodeRole , rev + 1 )
2021-01-22 19:43:27 +08:00
return nil
}
2021-06-28 13:28:14 +08:00
func ( s * Server ) loadDataNodes ( ) [ ] * datapb . DataNodeInfo {
if s . session == nil {
log . Warn ( "load data nodes but session is nil" )
return [ ] * datapb . DataNodeInfo { }
}
sessions , _ , err := s . session . GetSessions ( typeutil . DataNodeRole )
if err != nil {
log . Warn ( "load data nodes faild" , zap . Error ( err ) )
return [ ] * datapb . DataNodeInfo { }
}
datanodes := make ( [ ] * datapb . DataNodeInfo , 0 , len ( sessions ) )
for _ , session := range sessions {
datanodes = append ( datanodes , & datapb . DataNodeInfo {
Address : session . Address ,
Version : session . ServerID ,
Channels : [ ] * datapb . ChannelStatus { } ,
} )
}
return datanodes
}
2021-06-03 19:06:33 +08:00
func ( s * Server ) startSegmentManager ( ) {
2021-07-06 09:24:05 +08:00
s . segmentManager = newSegmentManager ( s . meta , s . allocator )
2021-05-21 18:30:41 +08:00
}
2021-01-22 11:07:07 +08:00
func ( s * Server ) initMeta ( ) error {
2021-02-26 15:17:47 +08:00
connectEtcdFn := func ( ) error {
2021-06-11 22:04:41 +08:00
etcdClient , err := clientv3 . New ( clientv3 . Config { Endpoints : Params . EtcdEndpoints } )
2021-02-26 15:17:47 +08:00
if err != nil {
return err
}
2021-04-24 11:29:15 +08:00
s . kvClient = etcdkv . NewEtcdKV ( etcdClient , Params . MetaRootPath )
s . meta , err = newMeta ( s . kvClient )
2021-02-26 15:17:47 +08:00
if err != nil {
return err
}
return nil
2021-01-22 11:07:07 +08:00
}
2021-06-23 09:24:10 +08:00
return retry . Do ( s . ctx , connectEtcdFn , retry . Attempts ( connEtcdMaxRetryTime ) )
2021-01-22 11:07:07 +08:00
}
2021-01-26 09:43:41 +08:00
func ( s * Server ) startServerLoop ( ) {
s . serverLoopCtx , s . serverLoopCancel = context . WithCancel ( s . ctx )
2021-05-28 09:55:21 +08:00
s . serverLoopWg . Add ( 5 )
2021-01-26 09:43:41 +08:00
go s . startStatsChannel ( s . serverLoopCtx )
2021-05-25 15:35:37 +08:00
go s . startDataNodeTtLoop ( s . serverLoopCtx )
2021-05-26 19:06:56 +08:00
go s . startWatchService ( s . serverLoopCtx )
go s . startActiveCheck ( s . serverLoopCtx )
2021-05-28 09:55:21 +08:00
go s . startFlushLoop ( s . serverLoopCtx )
2021-01-26 09:43:41 +08:00
}
func ( s * Server ) startStatsChannel ( ctx context . Context ) {
2021-03-04 16:01:30 +08:00
defer logutil . LogPanic ( )
2021-01-26 09:43:41 +08:00
defer s . serverLoopWg . Done ( )
2021-02-08 14:30:54 +08:00
statsStream , _ := s . msFactory . NewMsgStream ( ctx )
2021-06-21 11:40:15 +08:00
statsStream . AsConsumer ( [ ] string { Params . StatisticsChannelName } , Params . DataCoordSubscriptionName )
log . Debug ( "DataCoord stats stream" ,
2021-06-03 19:06:33 +08:00
zap . String ( "channelName" , Params . StatisticsChannelName ) ,
2021-06-21 11:40:15 +08:00
zap . String ( "descriptionName" , Params . DataCoordSubscriptionName ) )
2021-01-26 09:43:41 +08:00
statsStream . Start ( )
defer statsStream . Close ( )
for {
select {
case <- ctx . Done ( ) :
2021-06-22 18:24:08 +08:00
log . Debug ( "stats channel shutdown" )
2021-01-26 09:43:41 +08:00
return
default :
}
2021-03-25 14:41:46 +08:00
msgPack := statsStream . Consume ( )
2021-04-16 16:30:55 +08:00
if msgPack == nil {
2021-05-25 15:35:37 +08:00
return
2021-04-16 16:30:55 +08:00
}
2021-01-26 09:43:41 +08:00
for _ , msg := range msgPack . Msgs {
2021-04-24 11:29:15 +08:00
if msg . Type ( ) != commonpb . MsgType_SegmentStatistics {
2021-05-28 09:55:21 +08:00
log . Warn ( "receive unknown msg from segment statistics channel" ,
zap . Stringer ( "msgType" , msg . Type ( ) ) )
2021-04-24 11:29:15 +08:00
continue
2021-02-23 09:58:06 +08:00
}
2021-06-09 17:31:48 +08:00
log . Debug ( "Receive DataNode segment statistics update" )
2021-04-24 11:29:15 +08:00
ssMsg := msg . ( * msgstream . SegmentStatisticsMsg )
for _ , stat := range ssMsg . SegStats {
2021-07-12 17:24:25 +08:00
s . meta . SetCurrentRows ( stat . GetSegmentID ( ) , stat . GetNumRows ( ) )
2021-05-19 14:13:53 +08:00
}
2021-01-26 09:43:41 +08:00
}
}
}
2021-05-25 15:35:37 +08:00
func ( s * Server ) startDataNodeTtLoop ( ctx context . Context ) {
2021-03-04 16:01:30 +08:00
defer logutil . LogPanic ( )
2021-01-26 09:43:41 +08:00
defer s . serverLoopWg . Done ( )
2021-05-25 15:35:37 +08:00
ttMsgStream , err := s . msFactory . NewMsgStream ( ctx )
if err != nil {
log . Error ( "new msg stream failed" , zap . Error ( err ) )
return
}
ttMsgStream . AsConsumer ( [ ] string { Params . TimeTickChannelName } ,
2021-06-21 11:40:15 +08:00
Params . DataCoordSubscriptionName )
log . Debug ( fmt . Sprintf ( "DataCoord AsConsumer:%s:%s" ,
Params . TimeTickChannelName , Params . DataCoordSubscriptionName ) )
2021-05-25 15:35:37 +08:00
ttMsgStream . Start ( )
defer ttMsgStream . Close ( )
2021-01-26 09:43:41 +08:00
for {
select {
case <- ctx . Done ( ) :
2021-06-22 18:24:08 +08:00
log . Debug ( "data node tt loop shutdown" )
2021-01-26 09:43:41 +08:00
return
default :
}
2021-05-25 15:35:37 +08:00
msgPack := ttMsgStream . Consume ( )
2021-04-16 16:30:55 +08:00
if msgPack == nil {
2021-05-25 15:35:37 +08:00
return
2021-04-16 16:30:55 +08:00
}
2021-01-26 09:43:41 +08:00
for _ , msg := range msgPack . Msgs {
2021-05-25 15:35:37 +08:00
if msg . Type ( ) != commonpb . MsgType_DataNodeTt {
2021-06-08 19:25:37 +08:00
log . Warn ( "Receive unexpected msg type from tt channel" ,
2021-05-25 15:35:37 +08:00
zap . Stringer ( "msgType" , msg . Type ( ) ) )
2021-01-26 09:43:41 +08:00
continue
}
2021-05-25 15:35:37 +08:00
ttMsg := msg . ( * msgstream . DataNodeTtMsg )
ch := ttMsg . ChannelName
ts := ttMsg . Timestamp
2021-06-24 13:40:03 +08:00
// log.Debug("Receive datanode timetick msg", zap.String("channel", ch),
// zap.Any("ts", ts))
2021-06-03 19:06:33 +08:00
segments , err := s . segmentManager . GetFlushableSegments ( ctx , ch , ts )
2021-01-26 09:43:41 +08:00
if err != nil {
2021-05-25 15:35:37 +08:00
log . Warn ( "get flushable segments failed" , zap . Error ( err ) )
2021-01-26 09:43:41 +08:00
continue
}
2021-05-26 19:06:56 +08:00
2021-06-08 19:25:37 +08:00
if len ( segments ) == 0 {
continue
}
log . Debug ( "Flush segments" , zap . Int64s ( "segmentIDs" , segments ) )
2021-05-26 19:06:56 +08:00
segmentInfos := make ( [ ] * datapb . SegmentInfo , 0 , len ( segments ) )
2021-05-25 15:35:37 +08:00
for _ , id := range segments {
2021-07-07 14:02:01 +08:00
sInfo := s . meta . GetSegment ( id )
if sInfo == nil {
2021-05-25 15:35:37 +08:00
log . Error ( "get segment from meta error" , zap . Int64 ( "id" , id ) ,
2021-05-19 14:13:53 +08:00
zap . Error ( err ) )
2021-05-25 15:35:37 +08:00
continue
2021-05-19 14:13:53 +08:00
}
2021-07-12 17:24:25 +08:00
segmentInfos = append ( segmentInfos , sInfo . SegmentInfo )
2021-05-26 19:06:56 +08:00
}
2021-06-08 19:25:37 +08:00
if len ( segmentInfos ) > 0 {
2021-07-12 11:03:52 +08:00
s . cluster . Flush ( segmentInfos )
2021-06-08 19:25:37 +08:00
}
2021-06-24 14:20:10 +08:00
s . segmentManager . ExpireAllocations ( ch , ts )
2021-05-26 19:06:56 +08:00
}
}
}
func ( s * Server ) startWatchService ( ctx context . Context ) {
2021-05-28 09:55:21 +08:00
defer logutil . LogPanic ( )
2021-05-26 19:06:56 +08:00
defer s . serverLoopWg . Done ( )
for {
select {
case <- ctx . Done ( ) :
log . Debug ( "watch service shutdown" )
return
2021-05-28 09:55:21 +08:00
case event := <- s . eventCh :
2021-07-12 11:03:52 +08:00
info := & datapb . DataNodeInfo {
2021-05-26 19:06:56 +08:00
Address : event . Session . Address ,
Version : event . Session . ServerID ,
Channels : [ ] * datapb . ChannelStatus { } ,
2021-05-25 15:35:37 +08:00
}
2021-07-12 11:03:52 +08:00
node := NewNodeInfo ( ctx , info )
2021-05-26 19:06:56 +08:00
switch event . EventType {
case sessionutil . SessionAddEvent :
2021-06-08 19:25:37 +08:00
log . Info ( "Received datanode register" ,
2021-07-12 11:03:52 +08:00
zap . String ( "address" , info . Address ) ,
zap . Int64 ( "serverID" , info . Version ) )
s . cluster . Register ( node )
2021-05-26 19:06:56 +08:00
case sessionutil . SessionDelEvent :
2021-06-08 19:25:37 +08:00
log . Info ( "Received datanode unregister" ,
2021-07-12 11:03:52 +08:00
zap . String ( "address" , info . Address ) ,
zap . Int64 ( "serverID" , info . Version ) )
s . cluster . UnRegister ( node )
2021-05-26 19:06:56 +08:00
default :
log . Warn ( "receive unknown service event type" ,
zap . Any ( "type" , event . EventType ) )
}
}
}
}
2021-05-25 15:35:37 +08:00
2021-05-26 19:06:56 +08:00
func ( s * Server ) startActiveCheck ( ctx context . Context ) {
2021-05-28 09:55:21 +08:00
defer logutil . LogPanic ( )
2021-05-26 19:06:56 +08:00
defer s . serverLoopWg . Done ( )
for {
select {
case _ , ok := <- s . activeCh :
if ok {
continue
2021-05-19 14:13:53 +08:00
}
2021-05-26 19:06:56 +08:00
s . Stop ( )
2021-06-22 18:24:08 +08:00
log . Debug ( "disconnect with etcd and shutdown data coordinator" )
2021-05-26 19:06:56 +08:00
return
case <- ctx . Done ( ) :
log . Debug ( "connection check shutdown" )
return
2021-01-26 09:43:41 +08:00
}
}
}
2021-05-28 09:55:21 +08:00
func ( s * Server ) startFlushLoop ( ctx context . Context ) {
defer logutil . LogPanic ( )
defer s . serverLoopWg . Done ( )
ctx2 , cancel := context . WithCancel ( ctx )
defer cancel ( )
// send `Flushing` segments
go s . handleFlushingSegments ( ctx2 )
for {
select {
case <- ctx . Done ( ) :
log . Debug ( "flush loop shutdown" )
return
case segmentID := <- s . flushCh :
2021-07-07 14:02:01 +08:00
segment := s . meta . GetSegment ( segmentID )
if segment == nil {
2021-07-02 11:16:20 +08:00
log . Warn ( "failed to get flused segment" , zap . Int64 ( "id" , segmentID ) )
continue
}
req := & datapb . SegmentFlushCompletedMsg {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_SegmentFlushDone ,
} ,
2021-07-12 17:24:25 +08:00
Segment : segment . SegmentInfo ,
2021-07-02 11:16:20 +08:00
}
resp , err := s . rootCoordClient . SegmentFlushCompleted ( ctx , req )
if err = VerifyResponse ( resp , err ) ; err != nil {
log . Warn ( "failed to call SegmentFlushComplete" , zap . Int64 ( "segmentID" , segmentID ) , zap . Error ( err ) )
2021-05-28 09:55:21 +08:00
continue
}
// set segment to SegmentState_Flushed
2021-07-07 14:02:01 +08:00
if err = s . meta . SetState ( segmentID , commonpb . SegmentState_Flushed ) ; err != nil {
2021-05-28 09:55:21 +08:00
log . Error ( "flush segment complete failed" , zap . Error ( err ) )
continue
}
log . Debug ( "flush segment complete" , zap . Int64 ( "id" , segmentID ) )
}
}
}
func ( s * Server ) handleFlushingSegments ( ctx context . Context ) {
segments := s . meta . GetFlushingSegments ( )
for _ , segment := range segments {
select {
case <- ctx . Done ( ) :
return
case s . flushCh <- segment . ID :
}
}
}
2021-06-21 17:28:03 +08:00
func ( s * Server ) initRootCoordClient ( ) error {
2021-05-28 09:55:21 +08:00
var err error
2021-06-24 19:05:06 +08:00
if s . rootCoordClient , err = s . rootCoordClientCreator ( s . ctx , Params . MetaRootPath , Params . EtcdEndpoints ) ; err != nil {
2021-05-28 09:55:21 +08:00
return err
}
2021-06-21 17:28:03 +08:00
if err = s . rootCoordClient . Init ( ) ; err != nil {
2021-05-28 09:55:21 +08:00
return err
}
2021-06-21 17:28:03 +08:00
return s . rootCoordClient . Start ( )
2021-05-28 09:55:21 +08:00
}
2021-05-26 19:06:56 +08:00
2021-06-29 10:46:13 +08:00
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
// stop message stream client and stop server loops
2021-01-22 11:07:07 +08:00
func ( s * Server ) Stop ( ) error {
2021-06-29 10:46:13 +08:00
if ! atomic . CompareAndSwapInt64 ( & s . isServing , ServerStateHealthy , ServerStateStopped ) {
2021-05-28 09:55:21 +08:00
return nil
}
2021-06-21 11:40:15 +08:00
log . Debug ( "DataCoord server shutdown" )
2021-06-29 10:46:13 +08:00
atomic . StoreInt64 ( & s . isServing , ServerStateStopped )
2021-07-12 11:03:52 +08:00
s . cluster . Close ( )
2021-05-28 09:55:21 +08:00
s . stopServerLoop ( )
2021-01-22 11:07:07 +08:00
return nil
}
2021-04-16 16:30:55 +08:00
// CleanMeta only for test
func ( s * Server ) CleanMeta ( ) error {
2021-05-26 19:06:56 +08:00
log . Debug ( "clean meta" , zap . Any ( "kv" , s . kvClient ) )
2021-04-24 11:29:15 +08:00
return s . kvClient . RemoveWithPrefix ( "" )
2021-04-16 16:30:55 +08:00
}
2021-01-23 20:22:59 +08:00
func ( s * Server ) stopServerLoop ( ) {
s . serverLoopCancel ( )
s . serverLoopWg . Wait ( )
}
2021-04-26 09:45:54 +08:00
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
// if !s.meta.HasCollection(collID) {
// return fmt.Errorf("can not find collection %d", collID)
// }
// if !s.meta.HasPartition(collID, partID) {
// return fmt.Errorf("can not find partition %d", partID)
// }
// for _, name := range s.insertChannels {
// if name == channelName {
// return nil
// }
// }
// return fmt.Errorf("can not find channel %s", channelName)
//}
2021-04-12 16:35:51 +08:00
2021-06-21 17:28:03 +08:00
func ( s * Server ) loadCollectionFromRootCoord ( ctx context . Context , collectionID int64 ) error {
resp , err := s . rootCoordClient . DescribeCollection ( ctx , & milvuspb . DescribeCollectionRequest {
2021-02-26 09:23:39 +08:00
Base : & commonpb . MsgBase {
2021-03-10 14:45:35 +08:00
MsgType : commonpb . MsgType_DescribeCollection ,
2021-02-26 09:23:39 +08:00
SourceID : Params . NodeID ,
} ,
DbName : "" ,
CollectionID : collectionID ,
} )
if err = VerifyResponse ( resp , err ) ; err != nil {
return err
}
2021-06-21 17:28:03 +08:00
presp , err := s . rootCoordClient . ShowPartitions ( ctx , & milvuspb . ShowPartitionsRequest {
2021-04-12 16:35:51 +08:00
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ShowPartitions ,
2021-06-23 12:10:12 +08:00
MsgID : 0 ,
Timestamp : 0 ,
2021-04-12 16:35:51 +08:00
SourceID : Params . NodeID ,
2021-01-26 09:43:41 +08:00
} ,
2021-04-12 16:35:51 +08:00
DbName : "" ,
CollectionName : resp . Schema . Name ,
CollectionID : resp . CollectionID ,
} )
if err = VerifyResponse ( presp , err ) ; err != nil {
2021-05-29 10:47:29 +08:00
log . Error ( "show partitions error" , zap . String ( "collectionName" , resp . Schema . Name ) ,
zap . Int64 ( "collectionID" , resp . CollectionID ) , zap . Error ( err ) )
2021-01-26 09:43:41 +08:00
return err
}
2021-04-12 16:35:51 +08:00
collInfo := & datapb . CollectionInfo {
ID : resp . CollectionID ,
Schema : resp . Schema ,
Partitions : presp . PartitionIDs ,
}
2021-07-07 14:02:01 +08:00
s . meta . AddCollection ( collInfo )
return nil
2021-01-22 11:07:07 +08:00
}
2021-05-31 18:47:32 +08:00
func ( s * Server ) prepareBinlog ( req * datapb . SaveBinlogPathsRequest ) ( map [ string ] string , error ) {
2021-05-25 15:35:37 +08:00
meta := make ( map [ string ] string )
2021-05-20 11:34:45 +08:00
2021-05-26 12:21:55 +08:00
for _ , fieldBlp := range req . Field2BinlogPaths {
fieldMeta , err := s . prepareField2PathMeta ( req . SegmentID , fieldBlp )
if err != nil {
return nil , err
}
for k , v := range fieldMeta {
meta [ k ] = v
}
2021-05-20 11:34:45 +08:00
}
2021-05-26 12:21:55 +08:00
2021-05-25 15:35:37 +08:00
return meta , nil
2021-05-20 11:34:45 +08:00
}