2021-11-17 19:47:36 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-04-19 13:47:10 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-17 19:47:36 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-04-19 13:47:10 +08:00
//
2021-11-17 19:47:36 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-04-19 13:47:10 +08:00
2021-06-22 16:44:09 +08:00
package querycoord
2021-01-15 15:28:54 +08:00
2021-01-16 15:31:10 +08:00
import (
2021-01-22 14:28:06 +08:00
"context"
2021-10-11 19:00:46 +08:00
"errors"
2021-10-24 22:39:09 +08:00
"fmt"
2021-12-15 11:47:10 +08:00
"math"
2021-03-08 15:25:55 +08:00
"math/rand"
2022-03-17 17:17:22 +08:00
"os"
2021-12-15 11:47:10 +08:00
"sort"
2021-06-19 11:45:09 +08:00
"sync"
2021-01-22 14:28:06 +08:00
"sync/atomic"
2021-12-15 11:47:10 +08:00
"syscall"
2021-02-20 09:20:51 +08:00
"time"
2021-01-22 14:28:06 +08:00
2022-06-30 18:54:19 +08:00
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
2021-10-24 22:39:09 +08:00
"github.com/golang/protobuf/proto"
2022-04-07 22:05:32 +08:00
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
2021-10-22 19:07:15 +08:00
"github.com/milvus-io/milvus/internal/allocator"
2021-06-15 12:41:40 +08:00
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/log"
2021-06-19 11:45:09 +08:00
"github.com/milvus-io/milvus/internal/proto/commonpb"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/proto/internalpb"
2022-07-18 13:06:28 +08:00
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-06-19 11:45:09 +08:00
"github.com/milvus-io/milvus/internal/proto/querypb"
2022-03-17 18:03:23 +08:00
"github.com/milvus-io/milvus/internal/storage"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/types"
2022-06-29 23:26:19 +08:00
"github.com/milvus-io/milvus/internal/util"
2022-04-07 22:05:32 +08:00
"github.com/milvus-io/milvus/internal/util/dependency"
2021-09-15 20:40:07 +08:00
"github.com/milvus-io/milvus/internal/util/metricsinfo"
2021-12-23 18:39:11 +08:00
"github.com/milvus-io/milvus/internal/util/paramtable"
2021-05-21 19:28:52 +08:00
"github.com/milvus-io/milvus/internal/util/sessionutil"
2021-10-22 19:07:15 +08:00
"github.com/milvus-io/milvus/internal/util/tsoutil"
2021-04-22 14:45:57 +08:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-01-16 15:31:10 +08:00
)
2021-01-15 15:28:54 +08:00
2021-12-23 18:39:11 +08:00
// UniqueID is an alias for the Int64 type
type UniqueID = typeutil . UniqueID
2021-10-04 17:14:07 +08:00
// Timestamp is an alias for the Int64 type
2021-04-15 15:15:46 +08:00
type Timestamp = typeutil . Timestamp
2022-01-07 13:31:22 +08:00
// Params is param table of query coordinator
2022-02-08 20:57:47 +08:00
var Params paramtable . ComponentParam
2021-12-23 18:39:11 +08:00
2021-10-05 15:34:26 +08:00
// QueryCoord is the coordinator of queryNodes
2021-06-22 16:44:09 +08:00
type QueryCoord struct {
2021-01-22 14:28:06 +08:00
loopCtx context . Context
loopCancel context . CancelFunc
2021-06-19 11:45:09 +08:00
loopWg sync . WaitGroup
kvClient * etcdkv . EtcdKV
2021-01-22 14:28:06 +08:00
2021-09-23 21:56:02 +08:00
initOnce sync . Once
2022-06-29 23:26:19 +08:00
queryCoordID uint64
meta Meta
cluster Cluster
2022-07-18 13:06:28 +08:00
channelCleaner * ChannelCleaner
2022-06-29 23:26:19 +08:00
newNodeFn newQueryNodeFn
scheduler * TaskScheduler
idAllocator func ( ) ( UniqueID , error )
handoffHandler * HandoffHandler
2021-01-22 14:28:06 +08:00
2021-09-03 17:15:26 +08:00
metricsCacheManager * metricsinfo . MetricsCacheManager
2021-12-29 14:35:21 +08:00
etcdCli * clientv3 . Client
2021-11-12 16:49:10 +08:00
dataCoordClient types . DataCoord
rootCoordClient types . RootCoord
indexCoordClient types . IndexCoord
2022-02-08 21:57:46 +08:00
broker * globalMetaBroker
2021-01-22 14:28:06 +08:00
2022-06-21 16:08:13 +08:00
session * sessionutil . Session
eventChan <- chan * sessionutil . SessionEvent
offlineNodesChan chan UniqueID
2022-07-18 13:06:28 +08:00
offlineNodes map [ UniqueID ] struct { }
2021-05-21 19:28:52 +08:00
2022-02-08 21:57:46 +08:00
stateCode atomic . Value
2021-02-08 14:30:54 +08:00
2022-04-20 16:15:41 +08:00
factory dependency . Factory
chunkManager storage . ChunkManager
2022-06-22 13:40:13 +08:00
groupBalancer Balancer
2021-01-22 14:28:06 +08:00
}
2021-05-25 15:06:05 +08:00
// Register register query service at etcd
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) Register ( ) error {
2021-12-15 11:47:10 +08:00
qc . session . Register ( )
go qc . session . LivenessCheck ( qc . loopCtx , func ( ) {
log . Error ( "Query Coord disconnected from etcd, process will exit" , zap . Int64 ( "Server Id" , qc . session . ServerID ) )
if err := qc . Stop ( ) ; err != nil {
log . Fatal ( "failed to stop server" , zap . Error ( err ) )
}
// manually send signal to starter goroutine
2021-12-29 14:35:21 +08:00
if qc . session . TriggerKill {
2022-03-17 17:17:22 +08:00
if p , err := os . FindProcess ( os . Getpid ( ) ) ; err == nil {
p . Signal ( syscall . SIGINT )
}
2021-12-29 14:35:21 +08:00
}
2021-12-15 11:47:10 +08:00
} )
return nil
}
func ( qc * QueryCoord ) initSession ( ) error {
2022-02-07 10:09:45 +08:00
qc . session = sessionutil . NewSession ( qc . loopCtx , Params . EtcdCfg . MetaRootPath , qc . etcdCli )
2021-12-15 11:47:10 +08:00
if qc . session == nil {
return fmt . Errorf ( "session is nil, the etcd client connection may have failed" )
}
2021-12-29 14:35:21 +08:00
qc . session . Init ( typeutil . QueryCoordRole , Params . QueryCoordCfg . Address , true , true )
2022-04-24 22:03:44 +08:00
Params . QueryCoordCfg . SetNodeID ( qc . session . ServerID )
2022-02-07 10:09:45 +08:00
Params . SetLogger ( qc . session . ServerID )
2021-05-25 15:06:05 +08:00
return nil
}
2021-10-05 15:36:17 +08:00
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) Init ( ) error {
2022-04-26 11:29:54 +08:00
log . Info ( "query coordinator start init, session info" , zap . String ( "metaPath" , Params . EtcdCfg . MetaRootPath ) , zap . String ( "address" , Params . QueryCoordCfg . Address ) )
2021-12-14 15:31:07 +08:00
var initError error
2021-09-23 21:56:02 +08:00
qc . initOnce . Do ( func ( ) {
2021-12-15 11:47:10 +08:00
err := qc . initSession ( )
if err != nil {
2021-12-17 21:30:42 +08:00
log . Error ( "queryCoord init session failed" , zap . Error ( err ) )
2021-12-15 11:47:10 +08:00
initError = err
return
}
2022-02-07 10:09:45 +08:00
etcdKV := etcdkv . NewEtcdKV ( qc . etcdCli , Params . EtcdCfg . MetaRootPath )
2021-12-29 14:35:21 +08:00
qc . kvClient = etcdKV
2021-09-23 21:56:02 +08:00
log . Debug ( "query coordinator try to connect etcd success" )
2021-10-22 19:07:15 +08:00
// init id allocator
2022-02-07 10:09:45 +08:00
idAllocatorKV := tsoutil . NewTSOKVBase ( qc . etcdCli , Params . EtcdCfg . KvRootPath , "queryCoordTaskID" )
2021-10-22 19:07:15 +08:00
idAllocator := allocator . NewGlobalIDAllocator ( "idTimestamp" , idAllocatorKV )
initError = idAllocator . Initialize ( )
if initError != nil {
2022-04-26 11:29:54 +08:00
log . Error ( "query coordinator idAllocator initialize failed" , zap . Error ( initError ) )
2021-10-22 19:07:15 +08:00
return
}
qc . idAllocator = func ( ) ( UniqueID , error ) {
return idAllocator . AllocOne ( )
}
2022-04-12 19:47:33 +08:00
qc . factory . Init ( & Params )
2021-10-22 19:07:15 +08:00
// init meta
2022-06-27 21:04:17 +08:00
qc . meta , initError = newMeta ( qc . loopCtx , qc . kvClient , qc . factory , qc . idAllocator )
2021-09-23 21:56:02 +08:00
if initError != nil {
log . Error ( "query coordinator init meta failed" , zap . Error ( initError ) )
return
}
2021-08-03 22:03:25 +08:00
2022-01-17 17:37:37 +08:00
// init channelUnsubscribeHandler
2022-07-18 13:06:28 +08:00
qc . channelCleaner , initError = NewChannelCleaner ( qc . loopCtx , qc . kvClient , qc . factory )
2022-01-17 17:37:37 +08:00
if initError != nil {
log . Error ( "query coordinator init channelUnsubscribeHandler failed" , zap . Error ( initError ) )
return
}
2021-10-22 19:07:15 +08:00
// init cluster
2022-07-18 13:06:28 +08:00
qc . cluster , initError = newQueryNodeCluster ( qc . loopCtx , qc . meta , qc . kvClient , qc . newNodeFn , qc . session , qc . channelCleaner )
2021-09-23 21:56:02 +08:00
if initError != nil {
log . Error ( "query coordinator init cluster failed" , zap . Error ( initError ) )
return
}
2021-08-03 22:03:25 +08:00
2022-05-13 18:31:54 +08:00
qc . groupBalancer = newReplicaBalancer ( qc . meta , qc . cluster )
2022-04-20 16:15:41 +08:00
// NOTE: ignore the returned error
// we only try best to reload the leader addresses
reloadShardLeaderAddress ( qc . meta , qc . cluster )
2022-04-25 11:11:46 +08:00
qc . chunkManager , initError = qc . factory . NewVectorStorageChunkManager ( qc . loopCtx )
2022-03-17 18:03:23 +08:00
if initError != nil {
log . Error ( "query coordinator init cluster failed" , zap . Error ( initError ) )
return
}
2022-02-08 21:57:46 +08:00
//init globalMetaBroker
2022-03-17 18:03:23 +08:00
qc . broker , initError = newGlobalMetaBroker ( qc . loopCtx , qc . rootCoordClient , qc . dataCoordClient , qc . indexCoordClient , qc . chunkManager )
2022-02-08 21:57:46 +08:00
if initError != nil {
log . Error ( "query coordinator init globalMetaBroker failed" , zap . Error ( initError ) )
return
}
2021-10-22 19:07:15 +08:00
// init task scheduler
2022-02-08 21:57:46 +08:00
qc . scheduler , initError = newTaskScheduler ( qc . loopCtx , qc . meta , qc . cluster , qc . kvClient , qc . broker , qc . idAllocator )
2021-09-23 21:56:02 +08:00
if initError != nil {
log . Error ( "query coordinator init task scheduler failed" , zap . Error ( initError ) )
return
}
2021-08-03 22:03:25 +08:00
2021-11-17 09:47:12 +08:00
// init index checker
2022-06-29 23:26:19 +08:00
qc . handoffHandler , initError = newHandoffHandler ( qc . loopCtx , qc . kvClient , qc . meta , qc . cluster , qc . scheduler , qc . broker )
2021-11-17 09:47:12 +08:00
if initError != nil {
log . Error ( "query coordinator init index checker failed" , zap . Error ( initError ) )
return
}
2021-09-23 21:56:02 +08:00
qc . metricsCacheManager = metricsinfo . NewMetricsCacheManager ( )
} )
2022-04-26 11:29:54 +08:00
log . Info ( "QueryCoord init success" )
2021-09-23 21:56:02 +08:00
return initError
2021-01-22 14:28:06 +08:00
}
2021-10-05 15:38:03 +08:00
// Start function starts the goroutines to watch the meta and node updates
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) Start ( ) error {
qc . scheduler . Start ( )
2022-04-26 11:29:54 +08:00
log . Info ( "start scheduler ..." )
2021-10-12 19:39:24 +08:00
2022-06-29 23:26:19 +08:00
qc . handoffHandler . Start ( )
2022-04-26 11:29:54 +08:00
log . Info ( "start index checker ..." )
2021-11-17 09:47:12 +08:00
2022-07-18 13:06:28 +08:00
qc . channelCleaner . start ( )
log . Info ( "start channel cleaner loop ..." )
2022-01-17 17:37:37 +08:00
2021-12-23 18:39:11 +08:00
Params . QueryCoordCfg . CreatedTime = time . Now ( )
Params . QueryCoordCfg . UpdatedTime = time . Now ( )
2021-10-12 19:39:24 +08:00
2022-07-18 13:06:28 +08:00
qc . loopWg . Add ( 1 )
go qc . offlineNodeLoop ( )
2021-06-22 16:44:09 +08:00
qc . loopWg . Add ( 1 )
go qc . watchNodeLoop ( )
2021-06-19 11:45:09 +08:00
2021-10-24 22:39:09 +08:00
qc . loopWg . Add ( 1 )
2022-06-29 23:26:19 +08:00
go qc . handoffNotificationLoop ( )
2021-10-24 22:39:09 +08:00
2021-12-23 18:39:11 +08:00
if Params . QueryCoordCfg . AutoBalance {
2021-12-06 10:19:41 +08:00
qc . loopWg . Add ( 1 )
go qc . loadBalanceSegmentLoop ( )
}
2021-11-12 18:49:10 +08:00
2021-12-15 11:47:10 +08:00
qc . UpdateStateCode ( internalpb . StateCode_Healthy )
2021-09-23 18:29:55 +08:00
2021-01-22 14:28:06 +08:00
return nil
2021-01-16 15:31:10 +08:00
}
2021-10-06 14:00:47 +08:00
// Stop function stops watching the meta and node updates
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) Stop ( ) error {
2021-12-07 21:27:29 +08:00
qc . UpdateStateCode ( internalpb . StateCode_Abnormal )
2021-12-25 17:52:17 +08:00
if qc . scheduler != nil {
2022-06-29 23:26:19 +08:00
log . Info ( "close scheduler..." )
2021-12-25 17:52:17 +08:00
qc . scheduler . Close ( )
}
2022-06-29 23:26:19 +08:00
if qc . handoffHandler != nil {
log . Info ( "close index checker..." )
qc . handoffHandler . Stop ( )
2021-12-25 17:52:17 +08:00
}
2022-07-18 13:06:28 +08:00
if qc . channelCleaner != nil {
log . Info ( "close channel cleaner loop..." )
qc . channelCleaner . close ( )
2022-01-17 17:37:37 +08:00
}
2021-12-25 17:52:17 +08:00
if qc . loopCancel != nil {
2022-06-29 23:26:19 +08:00
log . Info ( "cancel the loop of QueryCoord..." )
2021-12-25 17:52:17 +08:00
qc . loopCancel ( )
}
2021-06-19 11:45:09 +08:00
2022-06-29 23:26:19 +08:00
log . Info ( "Query Coord stopped successfully..." )
2021-06-22 16:44:09 +08:00
qc . loopWg . Wait ( )
2021-11-16 22:31:14 +08:00
qc . session . Revoke ( time . Second )
2021-01-22 14:28:06 +08:00
return nil
2021-01-16 15:31:10 +08:00
}
2021-10-06 14:02:34 +08:00
// UpdateStateCode updates the status of the coord, including healthy, unhealthy
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) UpdateStateCode ( code internalpb . StateCode ) {
qc . stateCode . Store ( code )
2021-02-23 11:40:30 +08:00
}
2021-10-09 20:53:10 +08:00
// NewQueryCoord creates a QueryCoord object.
2022-04-07 22:05:32 +08:00
func NewQueryCoord ( ctx context . Context , factory dependency . Factory ) ( * QueryCoord , error ) {
2021-03-08 15:25:55 +08:00
rand . Seed ( time . Now ( ) . UnixNano ( ) )
2021-01-22 14:28:06 +08:00
ctx1 , cancel := context . WithCancel ( ctx )
2021-06-22 16:44:09 +08:00
service := & QueryCoord {
2022-06-21 16:08:13 +08:00
loopCtx : ctx1 ,
loopCancel : cancel ,
factory : factory ,
newNodeFn : newQueryNode ,
2022-07-18 13:06:28 +08:00
offlineNodesChan : make ( chan UniqueID , 256 ) ,
offlineNodes : make ( map [ UniqueID ] struct { } , 256 ) ,
2021-01-22 14:28:06 +08:00
}
2021-02-24 09:48:17 +08:00
2021-03-12 14:22:09 +08:00
service . UpdateStateCode ( internalpb . StateCode_Abnormal )
2021-01-22 14:28:06 +08:00
return service , nil
2021-01-15 15:28:54 +08:00
}
2021-01-26 15:13:20 +08:00
2022-03-21 11:17:24 +08:00
// SetEtcdClient sets etcd's client
2021-12-29 14:35:21 +08:00
func ( qc * QueryCoord ) SetEtcdClient ( etcdClient * clientv3 . Client ) {
qc . etcdCli = etcdClient
}
2021-10-06 14:04:25 +08:00
// SetRootCoord sets root coordinator's client
2021-10-11 19:00:46 +08:00
func ( qc * QueryCoord ) SetRootCoord ( rootCoord types . RootCoord ) error {
if rootCoord == nil {
2021-12-09 22:23:22 +08:00
return errors . New ( "null RootCoord interface" )
2021-10-11 19:00:46 +08:00
}
2021-06-22 16:44:09 +08:00
qc . rootCoordClient = rootCoord
2021-10-11 19:00:46 +08:00
return nil
2021-01-26 15:13:20 +08:00
}
2021-10-07 19:52:46 +08:00
// SetDataCoord sets data coordinator's client
2021-10-11 19:00:46 +08:00
func ( qc * QueryCoord ) SetDataCoord ( dataCoord types . DataCoord ) error {
if dataCoord == nil {
2021-12-09 22:25:08 +08:00
return errors . New ( "null DataCoord interface" )
2021-10-11 19:00:46 +08:00
}
2021-06-22 16:44:09 +08:00
qc . dataCoordClient = dataCoord
2021-10-11 19:00:46 +08:00
return nil
2021-01-26 15:13:20 +08:00
}
2021-06-19 11:45:09 +08:00
2022-03-21 11:17:24 +08:00
// SetIndexCoord sets index coordinator's client
2021-11-12 16:49:10 +08:00
func ( qc * QueryCoord ) SetIndexCoord ( indexCoord types . IndexCoord ) error {
if indexCoord == nil {
2021-12-10 21:54:01 +08:00
return errors . New ( "null IndexCoord interface" )
2021-11-12 16:49:10 +08:00
}
qc . indexCoordClient = indexCoord
return nil
}
2021-06-22 16:44:09 +08:00
func ( qc * QueryCoord ) watchNodeLoop ( ) {
ctx , cancel := context . WithCancel ( qc . loopCtx )
2021-06-19 11:45:09 +08:00
defer cancel ( )
2021-06-22 16:44:09 +08:00
defer qc . loopWg . Done ( )
2022-04-26 11:29:54 +08:00
log . Info ( "QueryCoord start watch node loop" )
2021-06-19 11:45:09 +08:00
2022-07-21 12:02:27 +08:00
// first check all the node has been assigned to replica
onlineNodes := qc . cluster . OnlineNodeIDs ( )
for _ , node := range onlineNodes {
if err := qc . allocateNode ( node ) ; err != nil {
log . Error ( "unable to allocate node" , zap . Int64 ( "nodeID" , node ) , zap . Error ( err ) )
panic ( err )
}
}
2022-07-18 13:06:28 +08:00
// the only judgement of processing a offline node is 1) etcd queryNodeInfoPrefix exist 2) the querynode session not exist
offlineNodes := qc . cluster . OfflineNodeIDs ( )
if len ( offlineNodes ) != 0 {
log . Warn ( "find querynode down while coord not alive" , zap . Any ( "nodeIDs" , offlineNodes ) )
for node := range offlineNodes {
qc . offlineNodesChan <- UniqueID ( node )
2022-06-22 13:40:13 +08:00
}
}
2022-02-15 15:07:48 +08:00
// TODO silverxia add Rewatch logic
2022-06-02 13:16:05 +08:00
qc . eventChan = qc . session . WatchServices ( typeutil . QueryNodeRole , qc . cluster . GetSessionVersion ( ) + 1 , nil )
2022-02-15 15:07:48 +08:00
qc . handleNodeEvent ( ctx )
}
2022-04-20 16:15:41 +08:00
func ( qc * QueryCoord ) allocateNode ( nodeID int64 ) error {
2022-06-22 13:40:13 +08:00
plans , err := qc . groupBalancer . AddNode ( nodeID )
2022-04-20 16:15:41 +08:00
if err != nil {
return err
}
for _ , p := range plans {
2022-05-25 18:53:59 +08:00
if err := qc . meta . applyReplicaBalancePlan ( p ) ; err != nil {
2022-07-21 12:02:27 +08:00
return err
2022-04-20 16:15:41 +08:00
}
}
return nil
}
2022-06-16 16:00:10 +08:00
2022-02-15 15:07:48 +08:00
func ( qc * QueryCoord ) handleNodeEvent ( ctx context . Context ) {
2021-06-19 11:45:09 +08:00
for {
select {
case <- ctx . Done ( ) :
return
2022-06-01 20:00:03 +08:00
2021-10-14 19:20:35 +08:00
case event , ok := <- qc . eventChan :
if ! ok {
2022-02-15 15:07:48 +08:00
// ErrCompacted is handled inside SessionWatcher
log . Error ( "Session Watcher channel closed" , zap . Int64 ( "server id" , qc . session . ServerID ) )
go qc . Stop ( )
if qc . session . TriggerKill {
2022-03-17 17:17:22 +08:00
if p , err := os . FindProcess ( os . Getpid ( ) ) ; err == nil {
p . Signal ( syscall . SIGINT )
}
2022-02-15 15:07:48 +08:00
}
2021-10-14 19:20:35 +08:00
return
}
2022-06-01 20:00:03 +08:00
2021-06-19 11:45:09 +08:00
switch event . EventType {
case sessionutil . SessionAddEvent :
serverID := event . Session . ServerID
2022-04-26 11:29:54 +08:00
log . Info ( "start add a QueryNode to cluster" , zap . Any ( "nodeID" , serverID ) )
2022-06-02 13:16:05 +08:00
err := qc . cluster . RegisterNode ( ctx , event . Session , serverID , disConnect )
2021-07-13 14:16:00 +08:00
if err != nil {
2021-12-10 21:55:48 +08:00
log . Error ( "QueryCoord failed to register a QueryNode" , zap . Int64 ( "nodeID" , serverID ) , zap . String ( "error info" , err . Error ( ) ) )
2022-04-20 16:15:41 +08:00
continue
}
2022-05-25 18:53:59 +08:00
go func ( serverID int64 ) {
2022-07-21 12:02:27 +08:00
for {
// retry forever, or crash.
// we should apply replica asyncly
err := qc . allocateNode ( serverID )
if err != nil {
log . Error ( "unable to allocate node" , zap . Int64 ( "nodeID" , serverID ) , zap . Error ( err ) )
continue
}
break
2022-05-25 18:53:59 +08:00
}
} ( serverID )
2021-09-03 17:15:26 +08:00
qc . metricsCacheManager . InvalidateSystemInfoMetrics ( )
2022-05-05 16:25:50 +08:00
2021-06-19 11:45:09 +08:00
case sessionutil . SessionDelEvent :
serverID := event . Session . ServerID
2022-04-26 11:29:54 +08:00
log . Info ( "get a del event after QueryNode down" , zap . Int64 ( "nodeID" , serverID ) )
2022-06-02 13:16:05 +08:00
nodeExist := qc . cluster . HasNode ( serverID )
2021-11-05 16:00:55 +08:00
if ! nodeExist {
2021-12-10 21:55:48 +08:00
log . Error ( "QueryNode not exist" , zap . Int64 ( "nodeID" , serverID ) )
2021-07-13 14:16:00 +08:00
continue
}
2022-06-02 13:16:05 +08:00
qc . cluster . StopNode ( serverID )
2022-06-21 16:08:13 +08:00
qc . offlineNodesChan <- serverID
2022-06-01 20:00:03 +08:00
}
}
}
}
2021-07-13 14:16:00 +08:00
2022-07-18 13:06:28 +08:00
func ( qc * QueryCoord ) offlineNodeLoop ( ) {
ctx , cancel := context . WithCancel ( qc . loopCtx )
defer cancel ( )
defer qc . loopWg . Done ( )
2022-06-22 13:40:13 +08:00
2022-07-18 13:06:28 +08:00
ticker := time . NewTicker ( time . Millisecond * 100 )
defer ticker . Stop ( )
2022-06-01 20:00:03 +08:00
for {
select {
case <- ctx . Done ( ) :
2022-07-18 13:06:28 +08:00
log . Info ( "offline node loop exit" )
2022-06-01 20:00:03 +08:00
return
2022-06-21 16:08:13 +08:00
case node := <- qc . offlineNodesChan :
2022-07-18 13:06:28 +08:00
qc . offlineNodes [ node ] = struct { } { }
qc . processOfflineNodes ( )
case <- ticker . C :
qc . processOfflineNodes ( )
}
}
}
2022-06-01 20:00:03 +08:00
2022-07-18 13:06:28 +08:00
func ( qc * QueryCoord ) processOfflineNodes ( ) {
for node := range qc . offlineNodes {
// check if all channel unsubscribe is handled, if not wait for next cycle
if ! qc . channelCleaner . isNodeChannelCleanHandled ( node ) {
log . Info ( "node channel is not cleaned, skip offline processing" , zap . Int64 ( "node" , node ) )
continue
}
2022-06-22 13:40:13 +08:00
2022-07-18 13:06:28 +08:00
loadBalanceSegment := & querypb . LoadBalanceRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_LoadBalanceSegments ,
SourceID : qc . session . ServerID ,
} ,
SourceNodeIDs : [ ] int64 { node } ,
BalanceReason : querypb . TriggerCondition_NodeDown ,
}
2022-06-01 20:00:03 +08:00
2022-07-18 13:06:28 +08:00
baseTask := newBaseTaskWithRetry ( qc . loopCtx , querypb . TriggerCondition_NodeDown , 0 )
loadBalanceTask := & loadBalanceTask {
baseTask : baseTask ,
LoadBalanceRequest : loadBalanceSegment ,
broker : qc . broker ,
cluster : qc . cluster ,
meta : qc . meta ,
}
qc . metricsCacheManager . InvalidateSystemInfoMetrics ( )
err := qc . scheduler . Enqueue ( loadBalanceTask )
if err != nil {
log . Warn ( "failed to enqueue LoadBalance task into the scheduler" ,
2022-06-01 20:00:03 +08:00
zap . Int64 ( "nodeID" , node ) ,
2022-07-18 13:06:28 +08:00
zap . Error ( err ) )
continue
}
2022-06-01 20:00:03 +08:00
2022-07-18 13:06:28 +08:00
log . Info ( "start a loadBalance task" ,
zap . Int64 ( "nodeID" , node ) ,
zap . Int64 ( "taskID" , loadBalanceTask . getTaskID ( ) ) )
2022-06-01 20:00:03 +08:00
2022-07-18 13:06:28 +08:00
err = loadBalanceTask . waitToFinish ( )
if err != nil {
log . Warn ( "failed to process LoadBalance task" ,
zap . Int64 ( "nodeID" , node ) ,
zap . Error ( err ) )
continue
2021-06-19 11:45:09 +08:00
}
2022-07-18 13:06:28 +08:00
delete ( qc . offlineNodes , node )
log . Info ( "LoadBalance task done, offline node is removed" , zap . Int64 ( "nodeID" , node ) )
2021-06-19 11:45:09 +08:00
}
}
2021-10-24 22:39:09 +08:00
2022-06-29 23:26:19 +08:00
func ( qc * QueryCoord ) handoffNotificationLoop ( ) {
2021-10-24 22:39:09 +08:00
ctx , cancel := context . WithCancel ( qc . loopCtx )
defer cancel ( )
defer qc . loopWg . Done ( )
2022-04-26 11:29:54 +08:00
log . Info ( "QueryCoord start watch segment loop" )
2021-10-24 22:39:09 +08:00
2022-06-29 23:26:19 +08:00
watchChan := qc . kvClient . WatchWithRevision ( util . HandoffSegmentPrefix , qc . handoffHandler . revision + 1 )
2021-10-24 22:39:09 +08:00
for {
select {
case <- ctx . Done ( ) :
return
2022-06-30 18:54:19 +08:00
case resp , ok := <- watchChan :
if ! ok {
log . Warn ( "QueryCoord watch handoff segment loop failed because watch channel is closed" )
panic ( "QueryCoord watch handoff segment loop failed because watch channel is closed" )
}
if err := resp . Err ( ) ; err != nil {
// https://github.com/etcd-io/etcd/issues/8980
if err == v3rpc . ErrCompacted {
qc . handoffHandler , err = newHandoffHandler ( qc . loopCtx , qc . kvClient , qc . meta , qc . cluster , qc . scheduler , qc . broker )
if err != nil {
log . Error ( "query coordinator re new handoff handler failed" , zap . Error ( err ) )
panic ( "failed to handle etcd request, exit.." )
}
if err2 := qc . handoffHandler . reloadFromKV ( ) ; err2 != nil {
log . Error ( "reload index checker meta fails when etcd has a compaction error" ,
zap . String ( "etcd error" , err . Error ( ) ) , zap . Error ( err2 ) )
panic ( "failed to handle etcd request, exit.." )
}
qc . loopWg . Add ( 1 )
go qc . handoffNotificationLoop ( )
return
}
log . Error ( "received error event from etcd watcher" , zap . String ( "prefix" , util . HandoffSegmentPrefix ) ,
zap . Error ( err ) )
panic ( "failed to handle etcd request, exit.." )
}
2021-10-24 22:39:09 +08:00
for _ , event := range resp . Events {
segmentInfo := & querypb . SegmentInfo { }
err := proto . Unmarshal ( event . Kv . Value , segmentInfo )
if err != nil {
log . Error ( "watchHandoffSegmentLoop: unmarshal failed" , zap . Any ( "error" , err . Error ( ) ) )
continue
}
switch event . Type {
case mvccpb . PUT :
2022-06-29 23:26:19 +08:00
qc . handoffHandler . enqueue ( segmentInfo )
log . Info ( "watchHandoffSegmentLoop: enqueue a handoff request to index checker" , zap . Any ( "segment info" , segmentInfo ) )
2021-10-24 22:39:09 +08:00
default :
// do nothing
}
}
}
}
2021-11-12 16:49:10 +08:00
}
2021-11-12 18:49:10 +08:00
func ( qc * QueryCoord ) loadBalanceSegmentLoop ( ) {
ctx , cancel := context . WithCancel ( qc . loopCtx )
defer cancel ( )
defer qc . loopWg . Done ( )
2022-04-26 11:29:54 +08:00
log . Info ( "QueryCoord start load balance segment loop" )
2021-11-12 18:49:10 +08:00
2021-12-23 18:39:11 +08:00
timer := time . NewTicker ( time . Duration ( Params . QueryCoordCfg . BalanceIntervalSeconds ) * time . Second )
2021-11-12 18:49:10 +08:00
for {
select {
case <- ctx . Done ( ) :
return
case <- timer . C :
2022-07-18 13:06:28 +08:00
startTs := time . Now ( )
// do not trigger load balance if task queue is not empty
if ! qc . scheduler . taskEmpty ( ) {
continue
2021-11-12 18:49:10 +08:00
}
2022-07-18 13:06:28 +08:00
collectionInfos := qc . meta . showCollections ( )
// shuffle to avoid always balance the same collections
rand . Seed ( time . Now ( ) . UnixNano ( ) )
rand . Shuffle ( len ( collectionInfos ) , func ( i , j int ) {
collectionInfos [ i ] , collectionInfos [ j ] = collectionInfos [ j ] , collectionInfos [ i ]
} )
2021-11-12 18:49:10 +08:00
// get mem info of online nodes from cluster
nodeID2MemUsageRate := make ( map [ int64 ] float64 )
nodeID2MemUsage := make ( map [ int64 ] uint64 )
nodeID2TotalMem := make ( map [ int64 ] uint64 )
2022-04-20 16:15:41 +08:00
loadBalanceTasks := make ( [ ] * loadBalanceTask , 0 )
// balance at most 20 collections in a round
2022-07-18 13:06:28 +08:00
for i := 0 ; i < len ( collectionInfos ) && i < 20 ; i ++ {
info := collectionInfos [ i ]
2022-04-20 16:15:41 +08:00
replicas , err := qc . meta . getReplicasByCollectionID ( info . GetCollectionID ( ) )
2021-11-12 18:49:10 +08:00
if err != nil {
2022-04-20 16:15:41 +08:00
log . Warn ( "unable to get replicas of collection" , zap . Int64 ( "collectionID" , info . GetCollectionID ( ) ) )
2021-11-12 18:49:10 +08:00
continue
}
2022-04-20 16:15:41 +08:00
for _ , replica := range replicas {
2022-07-18 13:06:28 +08:00
loadBalanceTasks = append ( loadBalanceTasks , qc . balanceReplica ( ctx , replica , nodeID2MemUsageRate , nodeID2MemUsage , nodeID2TotalMem ) ... )
2021-11-12 18:49:10 +08:00
}
}
2022-04-20 16:15:41 +08:00
for _ , t := range loadBalanceTasks {
2022-07-18 13:06:28 +08:00
err := qc . scheduler . Enqueue ( t )
if err != nil {
log . Error ( "loadBalanceSegmentLoop: balance task enqueue failed" , zap . Any ( "task" , t ) , zap . Error ( err ) )
continue
}
err = t . waitToFinish ( )
2022-04-20 16:15:41 +08:00
if err != nil {
// if failed, wait for next balance loop
// it may be that the collection/partition of the balanced segment has been released
// it also may be other abnormal errors
2022-07-18 13:06:28 +08:00
log . Error ( "loadBalanceSegmentLoop: balance task execute failed" , zap . Any ( "task" , t ) , zap . Error ( err ) )
2022-04-20 16:15:41 +08:00
} else {
2022-04-26 11:29:54 +08:00
log . Info ( "loadBalanceSegmentLoop: balance task execute success" , zap . Any ( "task" , t ) )
2021-11-12 18:49:10 +08:00
}
}
2022-07-18 13:06:28 +08:00
log . Info ( "finish balance loop successfully" , zap . Duration ( "time spent" , time . Since ( startTs ) ) )
}
}
}
// TODO balance replica need to be optimized, we can not get segment info in evert balance round
func ( qc * QueryCoord ) balanceReplica ( ctx context . Context , replica * milvuspb . ReplicaInfo , nodeID2MemUsageRate map [ int64 ] float64 ,
nodeID2MemUsage map [ int64 ] uint64 , nodeID2TotalMem map [ int64 ] uint64 ) [ ] * loadBalanceTask {
loadBalanceTasks := make ( [ ] * loadBalanceTask , 0 )
// auto balance is executed on replica level
onlineNodeIDs := replica . GetNodeIds ( )
if len ( onlineNodeIDs ) == 0 {
log . Error ( "loadBalanceSegmentLoop: there are no online QueryNode to balance" , zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) )
return loadBalanceTasks
}
var availableNodeIDs [ ] int64
nodeID2SegmentInfos := make ( map [ int64 ] map [ UniqueID ] * querypb . SegmentInfo )
for _ , nodeID := range onlineNodeIDs {
if _ , ok := nodeID2MemUsage [ nodeID ] ; ! ok {
nodeInfo , err := qc . cluster . GetNodeInfoByID ( nodeID )
if err != nil {
log . Warn ( "loadBalanceSegmentLoop: get node info from QueryNode failed" ,
zap . Int64 ( "nodeID" , nodeID ) , zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) ,
zap . Error ( err ) )
continue
}
nodeID2MemUsageRate [ nodeID ] = nodeInfo . ( * queryNode ) . memUsageRate
nodeID2MemUsage [ nodeID ] = nodeInfo . ( * queryNode ) . memUsage
nodeID2TotalMem [ nodeID ] = nodeInfo . ( * queryNode ) . totalMem
}
updateSegmentInfoDone := true
leastSegmentInfos := make ( map [ UniqueID ] * querypb . SegmentInfo )
segmentInfos := qc . meta . getSegmentInfosByNodeAndCollection ( nodeID , replica . GetCollectionID ( ) )
for _ , segmentInfo := range segmentInfos {
leastInfo , err := qc . cluster . GetSegmentInfoByID ( ctx , segmentInfo . SegmentID )
if err != nil {
log . Warn ( "loadBalanceSegmentLoop: get segment info from QueryNode failed" , zap . Int64 ( "nodeID" , nodeID ) ,
zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) ,
zap . Error ( err ) )
updateSegmentInfoDone = false
break
}
leastSegmentInfos [ segmentInfo . SegmentID ] = leastInfo
}
if updateSegmentInfoDone {
availableNodeIDs = append ( availableNodeIDs , nodeID )
nodeID2SegmentInfos [ nodeID ] = leastSegmentInfos
}
}
log . Info ( "loadBalanceSegmentLoop: memory usage rate of all online QueryNode" , zap . Int64 ( "collection" , replica . CollectionID ) ,
zap . Int64 ( "replica" , replica . ReplicaID ) , zap . Any ( "mem rate" , nodeID2MemUsageRate ) )
if len ( availableNodeIDs ) <= 1 {
log . Info ( "loadBalanceSegmentLoop: there are too few available query nodes to balance" ,
zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) ,
zap . Int64s ( "onlineNodeIDs" , onlineNodeIDs ) , zap . Int64s ( "availableNodeIDs" , availableNodeIDs ) )
return loadBalanceTasks
}
// check which nodes need balance and determine which segments on these nodes need to be migrated to other nodes
for {
sort . Slice ( availableNodeIDs , func ( i , j int ) bool {
return nodeID2MemUsageRate [ availableNodeIDs [ i ] ] > nodeID2MemUsageRate [ availableNodeIDs [ j ] ]
} )
// the memoryUsageRate of the sourceNode is higher than other query node
sourceNodeID := availableNodeIDs [ 0 ]
dstNodeID := availableNodeIDs [ len ( availableNodeIDs ) - 1 ]
memUsageRateDiff := nodeID2MemUsageRate [ sourceNodeID ] - nodeID2MemUsageRate [ dstNodeID ]
if nodeID2MemUsageRate [ sourceNodeID ] <= Params . QueryCoordCfg . OverloadedMemoryThresholdPercentage &&
memUsageRateDiff <= Params . QueryCoordCfg . MemoryUsageMaxDifferencePercentage {
break
}
// if memoryUsageRate of source node is greater than 90%, and the max memUsageDiff is greater than 30%
// then migrate the segments on source node to other query nodes
segmentInfos := nodeID2SegmentInfos [ sourceNodeID ]
// select the segment that needs balance on the source node
selectedSegmentInfo , err := chooseSegmentToBalance ( sourceNodeID , dstNodeID , segmentInfos , nodeID2MemUsage , nodeID2TotalMem , nodeID2MemUsageRate )
if err != nil {
break
}
if selectedSegmentInfo == nil {
break
}
// select a segment to balance successfully, then recursive traversal whether there are other segments that can balance
req := & querypb . LoadBalanceRequest {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_LoadBalanceSegments ,
} ,
BalanceReason : querypb . TriggerCondition_LoadBalance ,
SourceNodeIDs : [ ] UniqueID { sourceNodeID } ,
DstNodeIDs : [ ] UniqueID { dstNodeID } ,
SealedSegmentIDs : [ ] UniqueID { selectedSegmentInfo . SegmentID } ,
}
baseTask := newBaseTask ( qc . loopCtx , querypb . TriggerCondition_LoadBalance )
balanceTask := & loadBalanceTask {
baseTask : baseTask ,
LoadBalanceRequest : req ,
broker : qc . broker ,
cluster : qc . cluster ,
meta : qc . meta ,
2021-11-12 18:49:10 +08:00
}
2022-07-18 13:06:28 +08:00
log . Info ( "loadBalanceSegmentLoop: generate a loadBalance task" ,
zap . Int64 ( "collection" , replica . CollectionID ) , zap . Int64 ( "replica" , replica . ReplicaID ) ,
zap . Any ( "task" , balanceTask ) )
loadBalanceTasks = append ( loadBalanceTasks , balanceTask )
nodeID2MemUsage [ sourceNodeID ] -= uint64 ( selectedSegmentInfo . MemSize )
nodeID2MemUsage [ dstNodeID ] += uint64 ( selectedSegmentInfo . MemSize )
nodeID2MemUsageRate [ sourceNodeID ] = float64 ( nodeID2MemUsage [ sourceNodeID ] ) / float64 ( nodeID2TotalMem [ sourceNodeID ] )
nodeID2MemUsageRate [ dstNodeID ] = float64 ( nodeID2MemUsage [ dstNodeID ] ) / float64 ( nodeID2TotalMem [ dstNodeID ] )
delete ( nodeID2SegmentInfos [ sourceNodeID ] , selectedSegmentInfo . SegmentID )
nodeID2SegmentInfos [ dstNodeID ] [ selectedSegmentInfo . SegmentID ] = selectedSegmentInfo
continue
2021-11-12 18:49:10 +08:00
}
2022-07-18 13:06:28 +08:00
return loadBalanceTasks
2021-11-12 18:49:10 +08:00
}
func chooseSegmentToBalance ( sourceNodeID int64 , dstNodeID int64 ,
segmentInfos map [ UniqueID ] * querypb . SegmentInfo ,
nodeID2MemUsage map [ int64 ] uint64 ,
nodeID2TotalMem map [ int64 ] uint64 ,
nodeID2MemUsageRate map [ int64 ] float64 ) ( * querypb . SegmentInfo , error ) {
memoryInsufficient := true
minMemDiffPercentage := 1.0
2021-12-14 15:31:07 +08:00
var selectedSegmentInfo * querypb . SegmentInfo
2021-11-12 18:49:10 +08:00
for _ , info := range segmentInfos {
dstNodeMemUsageAfterBalance := nodeID2MemUsage [ dstNodeID ] + uint64 ( info . MemSize )
dstNodeMemUsageRateAfterBalance := float64 ( dstNodeMemUsageAfterBalance ) / float64 ( nodeID2TotalMem [ dstNodeID ] )
// if memUsageRate of dstNode is greater than OverloadedMemoryThresholdPercentage after balance, than can't balance
2021-12-23 18:39:11 +08:00
if dstNodeMemUsageRateAfterBalance < Params . QueryCoordCfg . OverloadedMemoryThresholdPercentage {
2021-11-12 18:49:10 +08:00
memoryInsufficient = false
sourceNodeMemUsageAfterBalance := nodeID2MemUsage [ sourceNodeID ] - uint64 ( info . MemSize )
sourceNodeMemUsageRateAfterBalance := float64 ( sourceNodeMemUsageAfterBalance ) / float64 ( nodeID2TotalMem [ sourceNodeID ] )
// assume all query node has same memory capacity
// if the memUsageRateDiff between the two nodes does not become smaller after balance, there is no need for balance
diffBeforBalance := nodeID2MemUsageRate [ sourceNodeID ] - nodeID2MemUsageRate [ dstNodeID ]
diffAfterBalance := dstNodeMemUsageRateAfterBalance - sourceNodeMemUsageRateAfterBalance
if diffAfterBalance < diffBeforBalance {
if math . Abs ( diffAfterBalance ) < minMemDiffPercentage {
selectedSegmentInfo = info
}
}
}
}
if memoryInsufficient {
2021-12-15 22:19:10 +08:00
return nil , errors . New ( "all QueryNode has insufficient memory" )
2021-11-12 18:49:10 +08:00
}
return selectedSegmentInfo , nil
}