2021-11-17 19:47:36 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 13:47:10 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-17 19:47:36 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 13:47:10 +08:00
//
2021-11-17 19:47:36 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 13:47:10 +08:00
2021-06-22 16:44:09 +08:00
package querycoord
2021-01-15 15:28:54 +08:00
2021-01-16 15:31:10 +08:00
import (
2021-01-22 14:28:06 +08:00
"context"
2021-10-11 19:00:46 +08:00
"errors"
2021-10-24 22:39:09 +08:00
"fmt"
2021-12-15 11:47:10 +08:00
"math"
2021-03-08 15:25:55 +08:00
"math/rand"
2022-03-17 17:17:22 +08:00
"os"
2021-12-15 11:47:10 +08:00
"sort"
2021-06-19 11:45:09 +08:00
"sync"
2021-01-22 14:28:06 +08:00
"sync/atomic"
2021-12-15 11:47:10 +08:00
"syscall"
2021-02-20 09:20:51 +08:00
"time"
2021-01-22 14:28:06 +08:00
2021-10-24 22:39:09 +08:00
"github.com/golang/protobuf/proto"
2022-04-07 22:05:32 +08:00
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
2021-10-22 19:07:15 +08:00
"github.com/milvus-io/milvus/internal/allocator"
2021-06-15 12:41:40 +08:00
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
2021-06-19 11:45:09 +08:00
"github.com/milvus-io/milvus/internal/proto/commonpb"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/internalpb"
2022-04-20 16:15:41 +08:00
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-06-19 11:45:09 +08:00
"github.com/milvus-io/milvus/internal/proto/querypb"
2022-03-17 18:03:23 +08:00
"github.com/milvus-io/milvus/internal/storage"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/types"
2022-04-07 22:05:32 +08:00
"github.com/milvus-io/milvus/internal/util/dependency"
2021-09-15 20:40:07 +08:00
"github.com/milvus-io/milvus/internal/util/metricsinfo"
2021-12-23 18:39:11 +08:00
"github.com/milvus-io/milvus/internal/util/paramtable"
2021-05-21 19:28:52 +08:00
"github.com/milvus-io/milvus/internal/util/sessionutil"
2021-10-22 19:07:15 +08:00
"github.com/milvus-io/milvus/internal/util/tsoutil"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-01-16 15:31:10 +08:00
)
2021-01-15 15:28:54 +08:00
2021-10-24 22:39:09 +08:00
const (
handoffSegmentPrefix = "querycoord-handoff"
)
2021-12-23 18:39:11 +08:00
// UniqueID is an alias for the Int64 type
type UniqueID = typeutil . UniqueID
2021-10-04 17:14:07 +08:00
// Timestamp is an alias for the Int64 type
2021-04-15 15:15:46 +08:00
type Timestamp = typeutil . Timestamp
2021-02-18 16:26:02 +08:00
type queryChannelInfo struct {
requestChannel string
responseChannel string
}
2022-01-07 13:31:22 +08:00
// Params is param table of query coordinator
2022-02-08 20:57:47 +08:00
var Params paramtable . ComponentParam
2021-12-23 18:39:11 +08:00
2021-10-05 15:34:26 +08:00
// QueryCoord is the coordinator of queryNodes
2021-06-22 16:44:09 +08:00
type QueryCoord struct {
2021-01-22 14:28:06 +08:00
loopCtx context . Context
loopCancel context . CancelFunc
2021-06-19 11:45:09 +08:00
loopWg sync . WaitGroup
kvClient * etcdkv . EtcdKV
2021-01-22 14:28:06 +08:00
2021-09-23 21:56:02 +08:00
initOnce sync . Once
2021-06-22 16:44:09 +08:00
queryCoordID uint64
2021-08-02 22:39:25 +08:00
meta Meta
2021-09-15 20:40:07 +08:00
cluster Cluster
2022-01-17 17:37:37 +08:00
handler * channelUnsubscribeHandler
2021-08-26 14:17:54 +08:00
newNodeFn newQueryNodeFn
2021-06-22 16:44:09 +08:00
scheduler * TaskScheduler
2021-10-22 19:07:15 +08:00
idAllocator func ( ) ( UniqueID , error )
2021-11-17 09:47:12 +08:00
indexChecker * IndexChecker
2021-01-22 14:28:06 +08:00
2021-09-03 17:15:26 +08:00
metricsCacheManager * metricsinfo . MetricsCacheManager
2021-12-29 14:35:21 +08:00
etcdCli * clientv3 . Client
2021-11-12 16:49:10 +08:00
dataCoordClient types . DataCoord
rootCoordClient types . RootCoord
indexCoordClient types . IndexCoord
2022-02-08 21:57:46 +08:00
broker * globalMetaBroker
2021-01-22 14:28:06 +08:00
2021-06-19 11:45:09 +08:00
session * sessionutil . Session
eventChan <- chan * sessionutil . SessionEvent
2021-05-21 19:28:52 +08:00
2022-02-08 21:57:46 +08:00
stateCode atomic . Value
2021-02-08 14:30:54 +08:00
2022-04-20 16:15:41 +08:00
factory dependency . Factory
chunkManager storage . ChunkManager
groupBalancer balancer
2021-01-22 14:28:06 +08:00
}
2021-05-25 15:06:05 +08:00
// Register register query service at etcd
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) Register ( ) error {
2021-12-15 11:47:10 +08:00
qc . session . Register ( )
go qc . session . LivenessCheck ( qc . loopCtx , func ( ) {
log . Error ( "Query Coord disconnected from etcd, process will exit" , zap . Int64 ( "Server Id" , qc . session . ServerID ) )
if err := qc . Stop ( ) ; err != nil {
log . Fatal ( "failed to stop server" , zap . Error ( err ) )
}
// manually send signal to starter goroutine
2021-12-29 14:35:21 +08:00
if qc . session . TriggerKill {
2022-03-17 17:17:22 +08:00
if p , err := os . FindProcess ( os . Getpid ( ) ) ; err == nil {
p . Signal ( syscall . SIGINT )
}
2021-12-29 14:35:21 +08:00
}
2021-12-15 11:47:10 +08:00
} )
return nil
}
func ( qc * QueryCoord ) initSession ( ) error {
2022-02-07 10:09:45 +08:00
qc . session = sessionutil . NewSession ( qc . loopCtx , Params . EtcdCfg . MetaRootPath , qc . etcdCli )
2021-12-15 11:47:10 +08:00
if qc . session == nil {
return fmt . Errorf ( "session is nil, the etcd client connection may have failed" )
}
2021-12-29 14:35:21 +08:00
qc . session . Init ( typeutil . QueryCoordRole , Params . QueryCoordCfg . Address , true , true )
2022-04-24 22:03:44 +08:00
Params . QueryCoordCfg . SetNodeID ( qc . session . ServerID )
2022-02-07 10:09:45 +08:00
Params . SetLogger ( qc . session . ServerID )
2021-05-25 15:06:05 +08:00
return nil
}
2021-10-05 15:36:17 +08:00
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) Init ( ) error {
2022-04-26 11:29:54 +08:00
log . Info ( "query coordinator start init, session info" , zap . String ( "metaPath" , Params . EtcdCfg . MetaRootPath ) , zap . String ( "address" , Params . QueryCoordCfg . Address ) )
2021-12-14 15:31:07 +08:00
var initError error
2021-09-23 21:56:02 +08:00
qc . initOnce . Do ( func ( ) {
2021-12-15 11:47:10 +08:00
err := qc . initSession ( )
if err != nil {
2021-12-17 21:30:42 +08:00
log . Error ( "queryCoord init session failed" , zap . Error ( err ) )
2021-12-15 11:47:10 +08:00
initError = err
return
}
2022-02-07 10:09:45 +08:00
etcdKV := etcdkv . NewEtcdKV ( qc . etcdCli , Params . EtcdCfg . MetaRootPath )
2021-12-29 14:35:21 +08:00
qc . kvClient = etcdKV
2021-09-23 21:56:02 +08:00
log . Debug ( "query coordinator try to connect etcd success" )
2021-10-22 19:07:15 +08:00
// init id allocator
2022-02-07 10:09:45 +08:00
idAllocatorKV := tsoutil . NewTSOKVBase ( qc . etcdCli , Params . EtcdCfg . KvRootPath , "queryCoordTaskID" )
2021-10-22 19:07:15 +08:00
idAllocator := allocator . NewGlobalIDAllocator ( "idTimestamp" , idAllocatorKV )
initError = idAllocator . Initialize ( )
if initError != nil {
2022-04-26 11:29:54 +08:00
log . Error ( "query coordinator idAllocator initialize failed" , zap . Error ( initError ) )
2021-10-22 19:07:15 +08:00
return
}
qc . idAllocator = func ( ) ( UniqueID , error ) {
return idAllocator . AllocOne ( )
}
2022-04-12 19:47:33 +08:00
qc . factory . Init ( & Params )
2021-10-22 19:07:15 +08:00
// init meta
2022-04-07 22:05:32 +08:00
qc . meta , initError = newMeta ( qc . loopCtx , qc . kvClient , qc . factory , qc . idAllocator )
2021-09-23 21:56:02 +08:00
if initError != nil {
log . Error ( "query coordinator init meta failed" , zap . Error ( initError ) )
return
}
2022-04-20 16:15:41 +08:00
qc . groupBalancer = newReplicaBalancer ( qc . meta )
2021-08-03 22:03:25 +08:00
2022-01-17 17:37:37 +08:00
// init channelUnsubscribeHandler
2022-04-07 22:05:32 +08:00
qc . handler , initError = newChannelUnsubscribeHandler ( qc . loopCtx , qc . kvClient , qc . factory )
2022-01-17 17:37:37 +08:00
if initError != nil {
log . Error ( "query coordinator init channelUnsubscribeHandler failed" , zap . Error ( initError ) )
return
}
2021-10-22 19:07:15 +08:00
// init cluster
2022-01-17 17:37:37 +08:00
qc . cluster , initError = newQueryNodeCluster ( qc . loopCtx , qc . meta , qc . kvClient , qc . newNodeFn , qc . session , qc . handler )
2021-09-23 21:56:02 +08:00
if initError != nil {
log . Error ( "query coordinator init cluster failed" , zap . Error ( initError ) )
return
}
2021-08-03 22:03:25 +08:00
2022-04-20 16:15:41 +08:00
// NOTE: ignore the returned error
// we only try best to reload the leader addresses
reloadShardLeaderAddress ( qc . meta , qc . cluster )
2022-04-25 11:11:46 +08:00
qc . chunkManager , initError = qc . factory . NewVectorStorageChunkManager ( qc . loopCtx )
2022-03-17 18:03:23 +08:00
if initError != nil {
log . Error ( "query coordinator init cluster failed" , zap . Error ( initError ) )
return
}
2022-02-08 21:57:46 +08:00
//init globalMetaBroker
2022-03-17 18:03:23 +08:00
qc . broker , initError = newGlobalMetaBroker ( qc . loopCtx , qc . rootCoordClient , qc . dataCoordClient , qc . indexCoordClient , qc . chunkManager )
2022-02-08 21:57:46 +08:00
if initError != nil {
log . Error ( "query coordinator init globalMetaBroker failed" , zap . Error ( initError ) )
return
}
2021-10-22 19:07:15 +08:00
// init task scheduler
2022-02-08 21:57:46 +08:00
qc . scheduler , initError = newTaskScheduler ( qc . loopCtx , qc . meta , qc . cluster , qc . kvClient , qc . broker , qc . idAllocator )
2021-09-23 21:56:02 +08:00
if initError != nil {
log . Error ( "query coordinator init task scheduler failed" , zap . Error ( initError ) )
return
}
2021-08-03 22:03:25 +08:00
2021-11-17 09:47:12 +08:00
// init index checker
2022-02-08 21:57:46 +08:00
qc . indexChecker , initError = newIndexChecker ( qc . loopCtx , qc . kvClient , qc . meta , qc . cluster , qc . scheduler , qc . broker )
2021-11-17 09:47:12 +08:00
if initError != nil {
log . Error ( "query coordinator init index checker failed" , zap . Error ( initError ) )
return
}
2021-09-23 21:56:02 +08:00
qc . metricsCacheManager = metricsinfo . NewMetricsCacheManager ( )
} )
2022-04-26 11:29:54 +08:00
log . Info ( "QueryCoord init success" )
2021-09-23 21:56:02 +08:00
return initError
2021-01-22 14:28:06 +08:00
}
2021-10-05 15:38:03 +08:00
// Start function starts the goroutines to watch the meta and node updates
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) Start ( ) error {
qc . scheduler . Start ( )
2022-04-26 11:29:54 +08:00
log . Info ( "start scheduler ..." )
2021-10-12 19:39:24 +08:00
2021-11-17 09:47:12 +08:00
qc . indexChecker . start ( )
2022-04-26 11:29:54 +08:00
log . Info ( "start index checker ..." )
2021-11-17 09:47:12 +08:00
2022-01-17 17:37:37 +08:00
qc . handler . start ( )
2022-04-26 11:29:54 +08:00
log . Info ( "start channel unsubscribe loop ..." )
2022-01-17 17:37:37 +08:00
2021-12-23 18:39:11 +08:00
Params . QueryCoordCfg . CreatedTime = time . Now ( )
Params . QueryCoordCfg . UpdatedTime = time . Now ( )
2021-10-12 19:39:24 +08:00
2021-06-22 16:44:09 +08:00
qc . loopWg . Add ( 1 )
go qc . watchNodeLoop ( )
2021-06-19 11:45:09 +08:00
2021-10-24 22:39:09 +08:00
qc . loopWg . Add ( 1 )
go qc . watchHandoffSegmentLoop ( )
2021-12-23 18:39:11 +08:00
if Params . QueryCoordCfg . AutoBalance {
2021-12-06 10:19:41 +08:00
qc . loopWg . Add ( 1 )
go qc . loadBalanceSegmentLoop ( )
}
2021-11-12 18:49:10 +08:00
2021-12-15 11:47:10 +08:00
qc . UpdateStateCode ( internalpb . StateCode_Healthy )
2021-09-23 18:29:55 +08:00
2021-01-22 14:28:06 +08:00
return nil
2021-01-16 15:31:10 +08:00
}
2021-10-06 14:00:47 +08:00
// Stop function stops watching the meta and node updates
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) Stop ( ) error {
2021-12-07 21:27:29 +08:00
qc . UpdateStateCode ( internalpb . StateCode_Abnormal )
2021-12-25 17:52:17 +08:00
if qc . scheduler != nil {
qc . scheduler . Close ( )
2022-04-26 11:29:54 +08:00
log . Info ( "close scheduler ..." )
2021-12-25 17:52:17 +08:00
}
if qc . indexChecker != nil {
qc . indexChecker . close ( )
2022-04-26 11:29:54 +08:00
log . Info ( "close index checker ..." )
2021-12-25 17:52:17 +08:00
}
2022-01-17 17:37:37 +08:00
if qc . handler != nil {
qc . handler . close ( )
2022-04-26 11:29:54 +08:00
log . Info ( "close channel unsubscribe loop ..." )
2022-01-17 17:37:37 +08:00
}
2021-12-25 17:52:17 +08:00
if qc . loopCancel != nil {
qc . loopCancel ( )
log . Info ( "cancel the loop of QueryCoord" )
}
2021-06-19 11:45:09 +08:00
2022-04-26 11:29:54 +08:00
log . Warn ( "Query Coord stopped successfully..." )
2021-06-22 16:44:09 +08:00
qc . loopWg . Wait ( )
2021-11-16 22:31:14 +08:00
qc . session . Revoke ( time . Second )
2021-01-22 14:28:06 +08:00
return nil
2021-01-16 15:31:10 +08:00
}
2021-10-06 14:02:34 +08:00
// UpdateStateCode updates the status of the coord, including healthy, unhealthy
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) UpdateStateCode ( code internalpb . StateCode ) {
qc . stateCode . Store ( code )
2021-02-23 11:40:30 +08:00
}
2021-10-09 20:53:10 +08:00
// NewQueryCoord creates a QueryCoord object.
2022-04-07 22:05:32 +08:00
func NewQueryCoord ( ctx context . Context , factory dependency . Factory ) ( * QueryCoord , error ) {
2021-03-08 15:25:55 +08:00
rand . Seed ( time . Now ( ) . UnixNano ( ) )
2021-01-22 14:28:06 +08:00
ctx1 , cancel := context . WithCancel ( ctx )
2021-06-22 16:44:09 +08:00
service := & QueryCoord {
2021-06-15 12:41:40 +08:00
loopCtx : ctx1 ,
loopCancel : cancel ,
2022-04-07 22:05:32 +08:00
factory : factory ,
2021-08-26 14:17:54 +08:00
newNodeFn : newQueryNode ,
2021-01-22 14:28:06 +08:00
}
2021-02-24 09:48:17 +08:00
2021-03-12 14:22:09 +08:00
service . UpdateStateCode ( internalpb . StateCode_Abnormal )
2021-01-22 14:28:06 +08:00
return service , nil
2021-01-15 15:28:54 +08:00
}
2021-01-26 15:13:20 +08:00
2022-03-21 11:17:24 +08:00
// SetEtcdClient sets etcd's client
2021-12-29 14:35:21 +08:00
func ( qc * QueryCoord ) SetEtcdClient ( etcdClient * clientv3 . Client ) {
qc . etcdCli = etcdClient
}
2021-10-06 14:04:25 +08:00
// SetRootCoord sets root coordinator's client
2021-10-11 19:00:46 +08:00
func ( qc * QueryCoord ) SetRootCoord ( rootCoord types . RootCoord ) error {
if rootCoord == nil {
2021-12-09 22:23:22 +08:00
return errors . New ( "null RootCoord interface" )
2021-10-11 19:00:46 +08:00
}
2021-06-22 16:44:09 +08:00
qc . rootCoordClient = rootCoord
2021-10-11 19:00:46 +08:00
return nil
2021-01-26 15:13:20 +08:00
}
2021-10-07 19:52:46 +08:00
// SetDataCoord sets data coordinator's client
2021-10-11 19:00:46 +08:00
func ( qc * QueryCoord ) SetDataCoord ( dataCoord types . DataCoord ) error {
if dataCoord == nil {
2021-12-09 22:25:08 +08:00
return errors . New ( "null DataCoord interface" )
2021-10-11 19:00:46 +08:00
}
2021-06-22 16:44:09 +08:00
qc . dataCoordClient = dataCoord
2021-10-11 19:00:46 +08:00
return nil
2021-01-26 15:13:20 +08:00
}
2021-06-19 11:45:09 +08:00
2022-03-21 11:17:24 +08:00
// SetIndexCoord sets index coordinator's client
2021-11-12 16:49:10 +08:00
func ( qc * QueryCoord ) SetIndexCoord ( indexCoord types . IndexCoord ) error {
if indexCoord == nil {
2021-12-10 21:54:01 +08:00
return errors . New ( "null IndexCoord interface" )
2021-11-12 16:49:10 +08:00
}
qc . indexCoordClient = indexCoord
return nil
}
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) watchNodeLoop ( ) {
ctx , cancel := context . WithCancel ( qc . loopCtx )
2021-06-19 11:45:09 +08:00
defer cancel ( )
2021-06-22 16:44:09 +08:00
defer qc . loopWg . Done ( )
2022-04-26 11:29:54 +08:00
log . Info ( "QueryCoord start watch node loop" )
2021-06-19 11:45:09 +08:00
2022-04-20 16:15:41 +08:00
unallocatedNodes := qc . getUnallocatedNodes ( )
for _ , n := range unallocatedNodes {
if err := qc . allocateNode ( n ) ; err != nil {
log . Warn ( "unable to allcoate node" , zap . Int64 ( "nodeID" , n ) , zap . Error ( err ) )
}
}
2021-12-21 11:57:39 +08:00
offlineNodeIDs := qc . cluster . offlineNodeIDs ( )
if len ( offlineNodeIDs ) != 0 {
2021-09-15 20:40:07 +08:00
loadBalanceSegment := & querypb . LoadBalanceRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_LoadBalanceSegments ,
SourceID : qc . session . ServerID ,
} ,
2022-01-25 17:26:13 +08:00
BalanceReason : querypb . TriggerCondition_NodeDown ,
2021-09-15 20:40:07 +08:00
SourceNodeIDs : offlineNodeIDs ,
2021-06-19 11:45:09 +08:00
}
2021-12-15 16:53:12 +08:00
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_NodeDown )
2021-10-18 21:34:47 +08:00
loadBalanceTask := & loadBalanceTask {
baseTask : baseTask ,
2021-09-15 20:40:07 +08:00
LoadBalanceRequest : loadBalanceSegment ,
2022-02-08 21:57:46 +08:00
broker : qc . broker ,
2021-09-15 20:40:07 +08:00
cluster : qc . cluster ,
meta : qc . meta ,
2021-06-19 11:45:09 +08:00
}
2021-10-11 09:54:37 +08:00
//TODO::deal enqueue error
qc . scheduler . Enqueue ( loadBalanceTask )
2022-04-26 11:29:54 +08:00
log . Info ( "start a loadBalance task" , zap . Any ( "task" , loadBalanceTask ) )
2021-06-19 11:45:09 +08:00
}
2022-02-15 15:07:48 +08:00
// TODO silverxia add Rewatch logic
2021-12-08 10:11:04 +08:00
qc . eventChan = qc . session . WatchServices ( typeutil . QueryNodeRole , qc . cluster . getSessionVersion ( ) + 1 , nil )
2022-02-15 15:07:48 +08:00
qc . handleNodeEvent ( ctx )
}
2022-04-20 16:15:41 +08:00
func ( qc * QueryCoord ) allocateNode ( nodeID int64 ) error {
plans , err := qc . groupBalancer . addNode ( nodeID )
if err != nil {
return err
}
for _ , p := range plans {
if err := qc . applyBalancePlan ( p ) ; err != nil {
log . Warn ( "failed to apply balance plan" , zap . Error ( err ) , zap . Any ( "plan" , p ) )
}
}
return nil
}
func ( qc * QueryCoord ) getUnallocatedNodes ( ) [ ] int64 {
onlines := qc . cluster . onlineNodeIDs ( )
var ret [ ] int64
for _ , n := range onlines {
replica , err := qc . meta . getReplicasByNodeID ( n )
if err != nil {
log . Warn ( "failed to get replica" , zap . Int64 ( "nodeID" , n ) , zap . Error ( err ) )
continue
}
if replica == nil {
ret = append ( ret , n )
}
}
return ret
}
func ( qc * QueryCoord ) applyBalancePlan ( p * balancePlan ) error {
if p . sourceReplica != - 1 {
replica , err := qc . meta . getReplicaByID ( p . sourceReplica )
if err != nil {
return err
}
replica = removeNodeFromReplica ( replica , p . nodeID )
if err := qc . meta . setReplicaInfo ( replica ) ; err != nil {
return err
}
}
if p . targetReplica != - 1 {
replica , err := qc . meta . getReplicaByID ( p . targetReplica )
if err != nil {
return err
}
replica . NodeIds = append ( replica . NodeIds , p . nodeID )
if err := qc . meta . setReplicaInfo ( replica ) ; err != nil {
return err
}
}
return nil
}
func removeNodeFromReplica ( replica * milvuspb . ReplicaInfo , nodeID int64 ) * milvuspb . ReplicaInfo {
for i := 0 ; i < len ( replica . NodeIds ) ; i ++ {
if replica . NodeIds [ i ] != nodeID {
continue
}
replica . NodeIds = append ( replica . NodeIds [ : i ] , replica . NodeIds [ i + 1 : ] ... )
return replica
}
return replica
}
2022-02-15 15:07:48 +08:00
func ( qc * QueryCoord ) handleNodeEvent ( ctx context . Context ) {
2021-06-19 11:45:09 +08:00
for {
select {
case <- ctx . Done ( ) :
return
2021-10-14 19:20:35 +08:00
case event , ok := <- qc . eventChan :
if ! ok {
2022-02-15 15:07:48 +08:00
// ErrCompacted is handled inside SessionWatcher
log . Error ( "Session Watcher channel closed" , zap . Int64 ( "server id" , qc . session . ServerID ) )
go qc . Stop ( )
if qc . session . TriggerKill {
2022-03-17 17:17:22 +08:00
if p , err := os . FindProcess ( os . Getpid ( ) ) ; err == nil {
p . Signal ( syscall . SIGINT )
}
2022-02-15 15:07:48 +08:00
}
2021-10-14 19:20:35 +08:00
return
}
2021-06-19 11:45:09 +08:00
switch event . EventType {
case sessionutil . SessionAddEvent :
serverID := event . Session . ServerID
2022-04-26 11:29:54 +08:00
log . Info ( "start add a QueryNode to cluster" , zap . Any ( "nodeID" , serverID ) )
2021-09-15 20:40:07 +08:00
err := qc . cluster . registerNode ( ctx , event . Session , serverID , disConnect )
2021-07-13 14:16:00 +08:00
if err != nil {
2021-12-10 21:55:48 +08:00
log . Error ( "QueryCoord failed to register a QueryNode" , zap . Int64 ( "nodeID" , serverID ) , zap . String ( "error info" , err . Error ( ) ) )
2022-04-20 16:15:41 +08:00
continue
}
if err := qc . allocateNode ( serverID ) ; err != nil {
log . Error ( "unable to allcoate node" , zap . Int64 ( "nodeID" , serverID ) , zap . Error ( err ) )
2021-07-13 14:16:00 +08:00
}
2021-09-03 17:15:26 +08:00
qc . metricsCacheManager . InvalidateSystemInfoMetrics ( )
2022-05-05 16:25:50 +08:00
2021-06-19 11:45:09 +08:00
case sessionutil . SessionDelEvent :
serverID := event . Session . ServerID
2022-04-26 11:29:54 +08:00
log . Info ( "get a del event after QueryNode down" , zap . Int64 ( "nodeID" , serverID ) )
2021-11-05 16:00:55 +08:00
nodeExist := qc . cluster . hasNode ( serverID )
if ! nodeExist {
2021-12-10 21:55:48 +08:00
log . Error ( "QueryNode not exist" , zap . Int64 ( "nodeID" , serverID ) )
2021-07-13 14:16:00 +08:00
continue
}
qc . cluster . stopNode ( serverID )
loadBalanceSegment := & querypb . LoadBalanceRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_LoadBalanceSegments ,
SourceID : qc . session . ServerID ,
} ,
SourceNodeIDs : [ ] int64 { serverID } ,
2021-12-15 16:53:12 +08:00
BalanceReason : querypb . TriggerCondition_NodeDown ,
2021-07-13 14:16:00 +08:00
}
2021-12-15 16:53:12 +08:00
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_NodeDown )
2021-10-18 21:34:47 +08:00
loadBalanceTask := & loadBalanceTask {
baseTask : baseTask ,
2021-07-13 14:16:00 +08:00
LoadBalanceRequest : loadBalanceSegment ,
2022-02-08 21:57:46 +08:00
broker : qc . broker ,
2021-07-13 14:16:00 +08:00
cluster : qc . cluster ,
meta : qc . meta ,
2021-06-27 12:16:09 +08:00
}
2021-09-03 17:15:26 +08:00
qc . metricsCacheManager . InvalidateSystemInfoMetrics ( )
2021-10-11 09:54:37 +08:00
//TODO:: deal enqueue error
qc . scheduler . Enqueue ( loadBalanceTask )
2022-04-26 11:29:54 +08:00
log . Info ( "start a loadBalance task" , zap . Any ( "task" , loadBalanceTask ) )
2021-06-19 11:45:09 +08:00
}
}
}
}
2021-10-24 22:39:09 +08:00
func ( qc * QueryCoord ) watchHandoffSegmentLoop ( ) {
ctx , cancel := context . WithCancel ( qc . loopCtx )
defer cancel ( )
defer qc . loopWg . Done ( )
2022-04-26 11:29:54 +08:00
log . Info ( "QueryCoord start watch segment loop" )
2021-10-24 22:39:09 +08:00
2021-11-17 09:47:12 +08:00
watchChan := qc . kvClient . WatchWithRevision ( handoffSegmentPrefix , qc . indexChecker . revision + 1 )
2021-10-24 22:39:09 +08:00
for {
select {
case <- ctx . Done ( ) :
return
case resp := <- watchChan :
for _ , event := range resp . Events {
segmentInfo := & querypb . SegmentInfo { }
err := proto . Unmarshal ( event . Kv . Value , segmentInfo )
if err != nil {
log . Error ( "watchHandoffSegmentLoop: unmarshal failed" , zap . Any ( "error" , err . Error ( ) ) )
continue
}
switch event . Type {
case mvccpb . PUT :
2021-12-30 19:09:33 +08:00
validHandoffReq , _ := qc . indexChecker . verifyHandoffReqValid ( segmentInfo )
if Params . QueryCoordCfg . AutoHandoff && validHandoffReq {
2021-11-17 09:47:12 +08:00
qc . indexChecker . enqueueHandoffReq ( segmentInfo )
2022-04-26 11:29:54 +08:00
log . Info ( "watchHandoffSegmentLoop: enqueue a handoff request to index checker" , zap . Any ( "segment info" , segmentInfo ) )
2021-11-12 16:49:10 +08:00
} else {
2022-04-26 11:29:54 +08:00
log . Info ( "watchHandoffSegmentLoop: collection/partition has not been loaded or autoHandoff equal to false, remove req from etcd" , zap . Any ( "segmentInfo" , segmentInfo ) )
2021-11-12 16:49:10 +08:00
buildQuerySegmentPath := fmt . Sprintf ( "%s/%d/%d/%d" , handoffSegmentPrefix , segmentInfo . CollectionID , segmentInfo . PartitionID , segmentInfo . SegmentID )
err = qc . kvClient . Remove ( buildQuerySegmentPath )
if err != nil {
log . Error ( "watchHandoffSegmentLoop: remove handoff segment from etcd failed" , zap . Error ( err ) )
2021-11-17 09:47:12 +08:00
panic ( err )
2021-11-12 16:49:10 +08:00
}
2021-10-24 22:39:09 +08:00
}
default :
// do nothing
}
}
}
}
2021-11-12 16:49:10 +08:00
}
2021-11-12 18:49:10 +08:00
func ( qc * QueryCoord ) loadBalanceSegmentLoop ( ) {
ctx , cancel := context . WithCancel ( qc . loopCtx )
defer cancel ( )
defer qc . loopWg . Done ( )
2022-04-26 11:29:54 +08:00
log . Info ( "QueryCoord start load balance segment loop" )
2021-11-12 18:49:10 +08:00
2021-12-23 18:39:11 +08:00
timer := time . NewTicker ( time . Duration ( Params . QueryCoordCfg . BalanceIntervalSeconds ) * time . Second )
2021-11-12 18:49:10 +08:00
2022-04-20 16:15:41 +08:00
var collectionInfos [ ] * querypb . CollectionInfo
pos := 0
2021-11-12 18:49:10 +08:00
for {
select {
case <- ctx . Done ( ) :
return
case <- timer . C :
2022-04-20 16:15:41 +08:00
if pos == len ( collectionInfos ) {
pos = 0
collectionInfos = qc . meta . showCollections ( )
2021-11-12 18:49:10 +08:00
}
// get mem info of online nodes from cluster
nodeID2MemUsageRate := make ( map [ int64 ] float64 )
nodeID2MemUsage := make ( map [ int64 ] uint64 )
nodeID2TotalMem := make ( map [ int64 ] uint64 )
2022-04-20 16:15:41 +08:00
loadBalanceTasks := make ( [ ] * loadBalanceTask , 0 )
// balance at most 20 collections in a round
for i := 0 ; pos < len ( collectionInfos ) && i < 20 ; i , pos = i + 1 , pos + 1 {
info := collectionInfos [ pos ]
replicas , err := qc . meta . getReplicasByCollectionID ( info . GetCollectionID ( ) )
2021-11-12 18:49:10 +08:00
if err != nil {
2022-04-20 16:15:41 +08:00
log . Warn ( "unable to get replicas of collection" , zap . Int64 ( "collectionID" , info . GetCollectionID ( ) ) )
2021-11-12 18:49:10 +08:00
continue
}
2022-04-20 16:15:41 +08:00
for _ , replica := range replicas {
// auto balance is executed on replica level
onlineNodeIDs := replica . GetNodeIds ( )
if len ( onlineNodeIDs ) == 0 {
2022-05-07 10:27:51 +08:00
log . Error ( "loadBalanceSegmentLoop: there are no online QueryNode to balance" , zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) )
2022-04-20 16:15:41 +08:00
continue
2021-11-12 18:49:10 +08:00
}
2022-04-20 16:15:41 +08:00
var availableNodeIDs [ ] int64
nodeID2SegmentInfos := make ( map [ int64 ] map [ UniqueID ] * querypb . SegmentInfo )
for _ , nodeID := range onlineNodeIDs {
if _ , ok := nodeID2MemUsage [ nodeID ] ; ! ok {
nodeInfo , err := qc . cluster . getNodeInfoByID ( nodeID )
if err != nil {
2022-05-07 10:27:51 +08:00
log . Warn ( "loadBalanceSegmentLoop: get node info from QueryNode failed" ,
zap . Int64 ( "nodeID" , nodeID ) , zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) ,
zap . Error ( err ) )
2022-04-20 16:15:41 +08:00
continue
}
nodeID2MemUsageRate [ nodeID ] = nodeInfo . ( * queryNode ) . memUsageRate
nodeID2MemUsage [ nodeID ] = nodeInfo . ( * queryNode ) . memUsage
nodeID2TotalMem [ nodeID ] = nodeInfo . ( * queryNode ) . totalMem
}
2021-11-12 18:49:10 +08:00
2022-04-20 16:15:41 +08:00
updateSegmentInfoDone := true
leastSegmentInfos := make ( map [ UniqueID ] * querypb . SegmentInfo )
segmentInfos := qc . meta . getSegmentInfosByNodeAndCollection ( nodeID , replica . GetCollectionID ( ) )
for _ , segmentInfo := range segmentInfos {
leastInfo , err := qc . cluster . getSegmentInfoByID ( ctx , segmentInfo . SegmentID )
if err != nil {
2022-05-07 10:27:51 +08:00
log . Warn ( "loadBalanceSegmentLoop: get segment info from QueryNode failed" , zap . Int64 ( "nodeID" , nodeID ) ,
zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) ,
zap . Error ( err ) )
2022-04-20 16:15:41 +08:00
updateSegmentInfoDone = false
break
}
leastSegmentInfos [ segmentInfo . SegmentID ] = leastInfo
}
if updateSegmentInfoDone {
availableNodeIDs = append ( availableNodeIDs , nodeID )
nodeID2SegmentInfos [ nodeID ] = leastSegmentInfos
}
2021-12-21 11:57:39 +08:00
}
2022-05-07 10:27:51 +08:00
log . Info ( "loadBalanceSegmentLoop: memory usage rate of all online QueryNode" , zap . Int64 ( "collection" , replica . CollectionID ) ,
zap . Int64 ( "replica" , replica . ReplicaID ) , zap . Any ( "mem rate" , nodeID2MemUsageRate ) )
2022-04-20 16:15:41 +08:00
if len ( availableNodeIDs ) <= 1 {
2022-05-07 10:27:51 +08:00
log . Info ( "loadBalanceSegmentLoop: there are too few available query nodes to balance" ,
zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) ,
zap . Int64s ( "onlineNodeIDs" , onlineNodeIDs ) , zap . Int64s ( "availableNodeIDs" , availableNodeIDs ) )
2022-04-20 16:15:41 +08:00
continue
}
// check which nodes need balance and determine which segments on these nodes need to be migrated to other nodes
memoryInsufficient := false
for {
sort . Slice ( availableNodeIDs , func ( i , j int ) bool {
return nodeID2MemUsageRate [ availableNodeIDs [ i ] ] > nodeID2MemUsageRate [ availableNodeIDs [ j ] ]
} )
// the memoryUsageRate of the sourceNode is higher than other query node
sourceNodeID := availableNodeIDs [ 0 ]
dstNodeID := availableNodeIDs [ len ( availableNodeIDs ) - 1 ]
memUsageRateDiff := nodeID2MemUsageRate [ sourceNodeID ] - nodeID2MemUsageRate [ dstNodeID ]
if nodeID2MemUsageRate [ sourceNodeID ] <= Params . QueryCoordCfg . OverloadedMemoryThresholdPercentage &&
memUsageRateDiff <= Params . QueryCoordCfg . MemoryUsageMaxDifferencePercentage {
break
}
// if memoryUsageRate of source node is greater than 90%, and the max memUsageDiff is greater than 30%
// then migrate the segments on source node to other query nodes
segmentInfos := nodeID2SegmentInfos [ sourceNodeID ]
// select the segment that needs balance on the source node
selectedSegmentInfo , err := chooseSegmentToBalance ( sourceNodeID , dstNodeID , segmentInfos , nodeID2MemUsage , nodeID2TotalMem , nodeID2MemUsageRate )
if err != nil {
// no enough memory on query nodes to balance, then notify proxy to stop insert
memoryInsufficient = true
break
}
if selectedSegmentInfo == nil {
break
}
// select a segment to balance successfully, then recursive traversal whether there are other segments that can balance
2021-11-12 18:49:10 +08:00
req := & querypb . LoadBalanceRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_LoadBalanceSegments ,
} ,
2021-12-15 16:53:12 +08:00
BalanceReason : querypb . TriggerCondition_LoadBalance ,
2021-11-12 18:49:10 +08:00
SourceNodeIDs : [ ] UniqueID { sourceNodeID } ,
DstNodeIDs : [ ] UniqueID { dstNodeID } ,
SealedSegmentIDs : [ ] UniqueID { selectedSegmentInfo . SegmentID } ,
}
2021-12-15 16:53:12 +08:00
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_LoadBalance )
2021-11-12 18:49:10 +08:00
balanceTask := & loadBalanceTask {
baseTask : baseTask ,
LoadBalanceRequest : req ,
2022-02-08 21:57:46 +08:00
broker : qc . broker ,
2021-11-12 18:49:10 +08:00
cluster : qc . cluster ,
meta : qc . meta ,
}
2022-05-07 10:27:51 +08:00
log . Info ( "loadBalanceSegmentLoop: generate a loadBalance task" ,
zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) ,
zap . Any ( "task" , balanceTask ) )
2021-11-12 18:49:10 +08:00
loadBalanceTasks = append ( loadBalanceTasks , balanceTask )
nodeID2MemUsage [ sourceNodeID ] -= uint64 ( selectedSegmentInfo . MemSize )
nodeID2MemUsage [ dstNodeID ] += uint64 ( selectedSegmentInfo . MemSize )
nodeID2MemUsageRate [ sourceNodeID ] = float64 ( nodeID2MemUsage [ sourceNodeID ] ) / float64 ( nodeID2TotalMem [ sourceNodeID ] )
nodeID2MemUsageRate [ dstNodeID ] = float64 ( nodeID2MemUsage [ dstNodeID ] ) / float64 ( nodeID2TotalMem [ dstNodeID ] )
delete ( nodeID2SegmentInfos [ sourceNodeID ] , selectedSegmentInfo . SegmentID )
nodeID2SegmentInfos [ dstNodeID ] [ selectedSegmentInfo . SegmentID ] = selectedSegmentInfo
continue
}
2022-04-20 16:15:41 +08:00
if memoryInsufficient {
// no enough memory on query nodes to balance, then notify proxy to stop insert
//TODO:: xige-16
2022-05-07 10:27:51 +08:00
log . Warn ( "loadBalanceSegmentLoop: QueryNode has insufficient memory, stop inserting data" , zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) )
2022-04-20 16:15:41 +08:00
}
2021-11-12 18:49:10 +08:00
}
}
2022-04-20 16:15:41 +08:00
for _ , t := range loadBalanceTasks {
qc . scheduler . Enqueue ( t )
err := t . waitToFinish ( )
if err != nil {
// if failed, wait for next balance loop
// it may be that the collection/partition of the balanced segment has been released
// it also may be other abnormal errors
log . Error ( "loadBalanceSegmentLoop: balance task execute failed" , zap . Any ( "task" , t ) )
} else {
2022-04-26 11:29:54 +08:00
log . Info ( "loadBalanceSegmentLoop: balance task execute success" , zap . Any ( "task" , t ) )
2021-11-12 18:49:10 +08:00
}
}
}
}
}
func chooseSegmentToBalance ( sourceNodeID int64 , dstNodeID int64 ,
segmentInfos map [ UniqueID ] * querypb . SegmentInfo ,
nodeID2MemUsage map [ int64 ] uint64 ,
nodeID2TotalMem map [ int64 ] uint64 ,
nodeID2MemUsageRate map [ int64 ] float64 ) ( * querypb . SegmentInfo , error ) {
memoryInsufficient := true
minMemDiffPercentage := 1.0
2021-12-14 15:31:07 +08:00
var selectedSegmentInfo * querypb . SegmentInfo
2021-11-12 18:49:10 +08:00
for _ , info := range segmentInfos {
dstNodeMemUsageAfterBalance := nodeID2MemUsage [ dstNodeID ] + uint64 ( info . MemSize )
dstNodeMemUsageRateAfterBalance := float64 ( dstNodeMemUsageAfterBalance ) / float64 ( nodeID2TotalMem [ dstNodeID ] )
// if memUsageRate of dstNode is greater than OverloadedMemoryThresholdPercentage after balance, than can't balance
2021-12-23 18:39:11 +08:00
if dstNodeMemUsageRateAfterBalance < Params . QueryCoordCfg . OverloadedMemoryThresholdPercentage {
2021-11-12 18:49:10 +08:00
memoryInsufficient = false
sourceNodeMemUsageAfterBalance := nodeID2MemUsage [ sourceNodeID ] - uint64 ( info . MemSize )
sourceNodeMemUsageRateAfterBalance := float64 ( sourceNodeMemUsageAfterBalance ) / float64 ( nodeID2TotalMem [ sourceNodeID ] )
// assume all query node has same memory capacity
// if the memUsageRateDiff between the two nodes does not become smaller after balance, there is no need for balance
diffBeforBalance := nodeID2MemUsageRate [ sourceNodeID ] - nodeID2MemUsageRate [ dstNodeID ]
diffAfterBalance := dstNodeMemUsageRateAfterBalance - sourceNodeMemUsageRateAfterBalance
if diffAfterBalance < diffBeforBalance {
if math . Abs ( diffAfterBalance ) < minMemDiffPercentage {
selectedSegmentInfo = info
}
}
}
}
if memoryInsufficient {
2021-12-15 22:19:10 +08:00
return nil , errors . New ( "all QueryNode has insufficient memory" )
2021-11-12 18:49:10 +08:00
}
return selectedSegmentInfo , nil
}