2021-11-17 19:49:32 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-11-11 12:56:42 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-17 19:49:32 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-11-11 12:56:42 +08:00
//
2021-11-17 19:49:32 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-11-11 12:56:42 +08:00
package querycoord
import (
"context"
"errors"
"sort"
"time"
"go.uber.org/zap"
2021-12-21 11:13:03 +08:00
"golang.org/x/sync/errgroup"
2021-11-11 12:56:42 +08:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
func defaultSegAllocatePolicy ( ) SegmentAllocatePolicy {
return shuffleSegmentsToQueryNodeV2
}
2021-12-21 11:57:39 +08:00
const shuffleWaitInterval = 1 * time . Second
2021-11-11 12:56:42 +08:00
// SegmentAllocatePolicy helper function definition to allocate Segment to queryNode
2021-12-21 11:57:39 +08:00
type SegmentAllocatePolicy func ( ctx context . Context , reqs [ ] * querypb . LoadSegmentsRequest , cluster Cluster , metaCache Meta , wait bool , excludeNodeIDs [ ] int64 , includeNodeIDs [ ] int64 ) error
2021-11-11 12:56:42 +08:00
// shuffleSegmentsToQueryNode shuffle segments to online nodes
// returned are noded id for each segment, which satisfies:
// len(returnedNodeIds) == len(segmentIDs) && segmentIDs[i] is assigned to returnedNodeIds[i]
2021-12-21 11:57:39 +08:00
func shuffleSegmentsToQueryNode ( ctx context . Context , reqs [ ] * querypb . LoadSegmentsRequest , cluster Cluster , metaCache Meta , wait bool , excludeNodeIDs [ ] int64 , includeNodeIDs [ ] int64 ) error {
2021-11-11 12:56:42 +08:00
if len ( reqs ) == 0 {
return nil
}
for {
2021-12-21 11:57:39 +08:00
onlineNodeIDs := cluster . onlineNodeIDs ( )
if len ( onlineNodeIDs ) == 0 {
err := errors . New ( "no online QueryNode to allocate" )
log . Error ( "shuffleSegmentsToQueryNode failed" , zap . Error ( err ) )
2021-11-11 12:56:42 +08:00
if ! wait {
return err
}
2021-12-21 11:57:39 +08:00
time . Sleep ( shuffleWaitInterval )
2021-11-11 12:56:42 +08:00
continue
}
2021-12-21 11:57:39 +08:00
var availableNodeIDs [ ] int64
nodeID2NumSegment := make ( map [ int64 ] int )
for _ , nodeID := range onlineNodeIDs {
// nodeID not in includeNodeIDs
2021-11-12 18:49:10 +08:00
if len ( includeNodeIDs ) > 0 && ! nodeIncluded ( nodeID , includeNodeIDs ) {
continue
}
2021-12-21 11:57:39 +08:00
// nodeID in excludeNodeIDs
if nodeIncluded ( nodeID , excludeNodeIDs ) {
2021-11-11 12:56:42 +08:00
continue
}
2021-12-21 11:57:39 +08:00
segmentInfos := metaCache . getSegmentInfosByNode ( nodeID )
nodeID2NumSegment [ nodeID ] = len ( segmentInfos )
availableNodeIDs = append ( availableNodeIDs , nodeID )
2021-11-11 12:56:42 +08:00
}
2021-12-21 11:57:39 +08:00
if len ( availableNodeIDs ) > 0 {
log . Debug ( "shuffleSegmentsToQueryNode: shuffle segment to available QueryNode" , zap . Int64s ( "available nodeIDs" , availableNodeIDs ) )
2021-11-11 12:56:42 +08:00
for _ , req := range reqs {
2021-12-21 11:57:39 +08:00
sort . Slice ( availableNodeIDs , func ( i , j int ) bool {
return nodeID2NumSegment [ availableNodeIDs [ i ] ] < nodeID2NumSegment [ availableNodeIDs [ j ] ]
2021-11-11 12:56:42 +08:00
} )
2021-12-21 11:57:39 +08:00
selectedNodeID := availableNodeIDs [ 0 ]
req . DstNodeID = selectedNodeID
nodeID2NumSegment [ selectedNodeID ] ++
2021-11-11 12:56:42 +08:00
}
return nil
}
if ! wait {
2021-12-21 11:57:39 +08:00
err := errors . New ( "no available queryNode to allocate" )
log . Error ( "shuffleSegmentsToQueryNode failed" , zap . Int64s ( "online nodeIDs" , onlineNodeIDs ) , zap . Int64s ( "exclude nodeIDs" , excludeNodeIDs ) , zap . Int64s ( "include nodeIDs" , includeNodeIDs ) , zap . Error ( err ) )
return err
2021-11-11 12:56:42 +08:00
}
2021-12-21 11:57:39 +08:00
time . Sleep ( shuffleWaitInterval )
2021-11-11 12:56:42 +08:00
}
}
2021-12-21 11:57:39 +08:00
func shuffleSegmentsToQueryNodeV2 ( ctx context . Context , reqs [ ] * querypb . LoadSegmentsRequest , cluster Cluster , metaCache Meta , wait bool , excludeNodeIDs [ ] int64 , includeNodeIDs [ ] int64 ) error {
2021-11-11 12:56:42 +08:00
// key = offset, value = segmentSize
if len ( reqs ) == 0 {
return nil
}
2021-11-18 19:35:15 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: start estimate the size of loadReqs" )
2021-11-19 18:09:13 +08:00
dataSizePerReq := make ( [ ] int64 , len ( reqs ) )
2021-12-21 11:13:03 +08:00
// use errgroup to collect errors of goroutines
g , _ := errgroup . WithContext ( ctx )
2021-11-19 18:09:13 +08:00
for offset , req := range reqs {
2021-12-21 11:13:03 +08:00
r , i := req , offset
g . Go ( func ( ) error {
size , err := cluster . estimateSegmentsSize ( r )
if err != nil {
log . Warn ( "estimate segment size error" ,
zap . Int64 ( "collectionID" , r . GetCollectionID ( ) ) ,
zap . Error ( err ) )
return err
}
dataSizePerReq [ i ] = size
return nil
} )
2021-11-19 18:09:13 +08:00
}
2021-12-21 11:13:03 +08:00
if err := g . Wait ( ) ; err != nil {
log . Warn ( "shuffleSegmentsToQueryNodeV2: estimate segment size error" , zap . Error ( err ) )
return err
2021-11-11 12:56:42 +08:00
}
2021-12-21 11:13:03 +08:00
2021-11-18 19:35:15 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: estimate the size of loadReqs end" )
2021-11-11 12:56:42 +08:00
for {
// online nodes map and totalMem, usedMem, memUsage of every node
totalMem := make ( map [ int64 ] uint64 )
memUsage := make ( map [ int64 ] uint64 )
memUsageRate := make ( map [ int64 ] float64 )
2021-12-21 11:57:39 +08:00
onlineNodeIDs := cluster . onlineNodeIDs ( )
if len ( onlineNodeIDs ) == 0 && ! wait {
err := errors . New ( "no online queryNode to allocate" )
log . Error ( "shuffleSegmentsToQueryNode failed" , zap . Error ( err ) )
return err
2021-11-11 12:56:42 +08:00
}
2021-12-21 11:57:39 +08:00
var availableNodeIDs [ ] int64
for _ , nodeID := range onlineNodeIDs {
// nodeID not in includeNodeIDs
2021-11-12 18:49:10 +08:00
if len ( includeNodeIDs ) > 0 && ! nodeIncluded ( nodeID , includeNodeIDs ) {
2021-12-21 11:57:39 +08:00
continue
}
// nodeID in excludeNodeIDs
if nodeIncluded ( nodeID , excludeNodeIDs ) {
2021-11-12 18:49:10 +08:00
continue
}
2021-11-11 12:56:42 +08:00
// statistic nodeInfo, used memory, memory usage of every query node
nodeInfo , err := cluster . getNodeInfoByID ( nodeID )
if err != nil {
2021-12-21 11:57:39 +08:00
log . Warn ( "shuffleSegmentsToQueryNodeV2: getNodeInfoByID failed" , zap . Error ( err ) )
2021-11-11 12:56:42 +08:00
continue
}
queryNodeInfo := nodeInfo . ( * queryNode )
// avoid allocate segment to node which memUsageRate is high
2021-12-23 18:39:11 +08:00
if queryNodeInfo . memUsageRate >= Params . QueryCoordCfg . OverloadedMemoryThresholdPercentage {
2021-11-11 12:56:42 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode" , zap . Int64 ( "nodeID" , nodeID ) , zap . Float64 ( "current rate" , queryNodeInfo . memUsageRate ) )
continue
}
// update totalMem, memUsage, memUsageRate
totalMem [ nodeID ] , memUsage [ nodeID ] , memUsageRate [ nodeID ] = queryNodeInfo . totalMem , queryNodeInfo . memUsage , queryNodeInfo . memUsageRate
2021-12-21 11:57:39 +08:00
availableNodeIDs = append ( availableNodeIDs , nodeID )
2021-11-11 12:56:42 +08:00
}
2021-12-21 11:57:39 +08:00
if len ( availableNodeIDs ) > 0 {
log . Debug ( "shuffleSegmentsToQueryNodeV2: shuffle segment to available QueryNode" , zap . Int64s ( "available nodeIDs" , availableNodeIDs ) )
memoryInsufficient := false
2021-11-11 12:56:42 +08:00
for offset , sizeOfReq := range dataSizePerReq {
// sort nodes by memUsageRate, low to high
2021-12-21 11:57:39 +08:00
sort . Slice ( availableNodeIDs , func ( i , j int ) bool {
return memUsageRate [ availableNodeIDs [ i ] ] < memUsageRate [ availableNodeIDs [ j ] ]
2021-11-11 12:56:42 +08:00
} )
findNodeToAllocate := false
// assign load segment request to query node which has least memUsageRate
2021-12-21 11:57:39 +08:00
for _ , nodeID := range availableNodeIDs {
2021-11-11 12:56:42 +08:00
memUsageAfterLoad := memUsage [ nodeID ] + uint64 ( sizeOfReq )
memUsageRateAfterLoad := float64 ( memUsageAfterLoad ) / float64 ( totalMem [ nodeID ] )
2021-12-23 18:39:11 +08:00
if memUsageRateAfterLoad > Params . QueryCoordCfg . OverloadedMemoryThresholdPercentage {
2021-11-11 12:56:42 +08:00
continue
}
reqs [ offset ] . DstNodeID = nodeID
memUsage [ nodeID ] = memUsageAfterLoad
memUsageRate [ nodeID ] = memUsageRateAfterLoad
findNodeToAllocate = true
break
}
// the load segment request can't be allocated to any query node
if ! findNodeToAllocate {
2021-12-21 11:57:39 +08:00
memoryInsufficient = true
2021-11-11 12:56:42 +08:00
break
}
}
2021-12-21 11:57:39 +08:00
// shuffle segment success
if ! memoryInsufficient {
2021-11-18 19:35:15 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: shuffle segment to query node success" )
2021-11-11 12:56:42 +08:00
return nil
}
2021-12-21 11:57:39 +08:00
// memory insufficient and wait == false
if ! wait {
err := errors . New ( "shuffleSegmentsToQueryNodeV2: insufficient memory of available node" )
log . Error ( "shuffleSegmentsToQueryNode failed" , zap . Int64s ( "online nodeIDs" , onlineNodeIDs ) , zap . Int64s ( "exclude nodeIDs" , excludeNodeIDs ) , zap . Int64s ( "include nodeIDs" , includeNodeIDs ) , zap . Error ( err ) )
return err
}
2021-11-11 12:56:42 +08:00
} else {
2021-12-21 11:57:39 +08:00
// no available node to allocate and wait == false
if ! wait {
err := errors . New ( "no available queryNode to allocate" )
log . Error ( "shuffleSegmentsToQueryNode failed" , zap . Int64s ( "online nodeIDs" , onlineNodeIDs ) , zap . Int64s ( "exclude nodeIDs" , excludeNodeIDs ) , zap . Int64s ( "include nodeIDs" , includeNodeIDs ) , zap . Error ( err ) )
return err
}
2021-11-11 12:56:42 +08:00
}
2021-12-21 11:57:39 +08:00
time . Sleep ( shuffleWaitInterval )
2021-11-11 12:56:42 +08:00
}
}
2021-11-12 18:49:10 +08:00
func nodeIncluded ( nodeID int64 , includeNodeIDs [ ] int64 ) bool {
for _ , id := range includeNodeIDs {
if id == nodeID {
return true
}
}
return false
}