2021-11-17 19:49:32 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
2021-11-11 12:56:42 +08:00
// with the License. You may obtain a copy of the License at
//
2021-11-17 19:49:32 +08:00
// http://www.apache.org/licenses/LICENSE-2.0
2021-11-11 12:56:42 +08:00
//
2021-11-17 19:49:32 +08:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-11-11 12:56:42 +08:00
package querycoord
import (
"context"
"errors"
"sort"
2021-11-19 18:09:13 +08:00
"sync"
2021-11-11 12:56:42 +08:00
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
func defaultSegAllocatePolicy ( ) SegmentAllocatePolicy {
return shuffleSegmentsToQueryNodeV2
}
// SegmentAllocatePolicy helper function definition to allocate Segment to queryNode
2021-11-12 18:49:10 +08:00
type SegmentAllocatePolicy func ( ctx context . Context , reqs [ ] * querypb . LoadSegmentsRequest , cluster Cluster , wait bool , excludeNodeIDs [ ] int64 , includeNodeIDs [ ] int64 ) error
2021-11-11 12:56:42 +08:00
// shuffleSegmentsToQueryNode shuffle segments to online nodes
// returned are noded id for each segment, which satisfies:
// len(returnedNodeIds) == len(segmentIDs) && segmentIDs[i] is assigned to returnedNodeIds[i]
2021-11-12 18:49:10 +08:00
func shuffleSegmentsToQueryNode ( ctx context . Context , reqs [ ] * querypb . LoadSegmentsRequest , cluster Cluster , wait bool , excludeNodeIDs [ ] int64 , includeNodeIDs [ ] int64 ) error {
2021-11-11 12:56:42 +08:00
if len ( reqs ) == 0 {
return nil
}
for {
availableNodes , err := cluster . onlineNodes ( )
if err != nil {
log . Debug ( err . Error ( ) )
if ! wait {
return err
}
time . Sleep ( 1 * time . Second )
continue
}
for _ , id := range excludeNodeIDs {
delete ( availableNodes , id )
}
nodeID2NumSegemnt := make ( map [ int64 ] int )
for nodeID := range availableNodes {
2021-11-12 18:49:10 +08:00
if len ( includeNodeIDs ) > 0 && ! nodeIncluded ( nodeID , includeNodeIDs ) {
delete ( availableNodes , nodeID )
continue
}
2021-11-11 12:56:42 +08:00
numSegments , err := cluster . getNumSegments ( nodeID )
if err != nil {
delete ( availableNodes , nodeID )
continue
}
nodeID2NumSegemnt [ nodeID ] = numSegments
}
if len ( availableNodes ) > 0 {
nodeIDSlice := make ( [ ] int64 , 0 )
for nodeID := range availableNodes {
nodeIDSlice = append ( nodeIDSlice , nodeID )
}
for _ , req := range reqs {
sort . Slice ( nodeIDSlice , func ( i , j int ) bool {
return nodeID2NumSegemnt [ nodeIDSlice [ i ] ] < nodeID2NumSegemnt [ nodeIDSlice [ j ] ]
} )
req . DstNodeID = nodeIDSlice [ 0 ]
nodeID2NumSegemnt [ nodeIDSlice [ 0 ] ] ++
}
return nil
}
if ! wait {
return errors . New ( "no queryNode to allocate" )
}
}
}
2021-11-12 18:49:10 +08:00
func shuffleSegmentsToQueryNodeV2 ( ctx context . Context , reqs [ ] * querypb . LoadSegmentsRequest , cluster Cluster , wait bool , excludeNodeIDs [ ] int64 , includeNodeIDs [ ] int64 ) error {
2021-11-11 12:56:42 +08:00
// key = offset, value = segmentSize
if len ( reqs ) == 0 {
return nil
}
2021-11-18 19:35:15 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: start estimate the size of loadReqs" )
2021-11-19 18:09:13 +08:00
dataSizePerReq := make ( [ ] int64 , len ( reqs ) )
estimateError := make ( [ ] error , len ( reqs ) )
var estimateWg sync . WaitGroup
estimateReqFn := func ( offset int , req * querypb . LoadSegmentsRequest ) {
defer estimateWg . Done ( )
dataSizePerReq [ offset ] , estimateError [ offset ] = cluster . estimateSegmentsSize ( req )
}
for offset , req := range reqs {
estimateWg . Add ( 1 )
go estimateReqFn ( offset , req )
}
estimateWg . Wait ( )
for _ , err := range estimateError {
2021-11-11 12:56:42 +08:00
if err != nil {
2021-11-19 18:09:13 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: estimate segment size error" , zap . Error ( err ) )
2021-11-11 12:56:42 +08:00
return err
}
}
2021-11-18 19:35:15 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: estimate the size of loadReqs end" )
2021-11-11 12:56:42 +08:00
for {
// online nodes map and totalMem, usedMem, memUsage of every node
totalMem := make ( map [ int64 ] uint64 )
memUsage := make ( map [ int64 ] uint64 )
memUsageRate := make ( map [ int64 ] float64 )
availableNodes , err := cluster . onlineNodes ( )
if err != nil && ! wait {
return errors . New ( "no online queryNode to allocate" )
}
for _ , id := range excludeNodeIDs {
delete ( availableNodes , id )
}
for nodeID := range availableNodes {
2021-11-12 18:49:10 +08:00
if len ( includeNodeIDs ) > 0 && ! nodeIncluded ( nodeID , includeNodeIDs ) {
delete ( availableNodes , nodeID )
continue
}
2021-11-11 12:56:42 +08:00
// statistic nodeInfo, used memory, memory usage of every query node
nodeInfo , err := cluster . getNodeInfoByID ( nodeID )
if err != nil {
log . Debug ( "shuffleSegmentsToQueryNodeV2: getNodeInfoByID failed" , zap . Error ( err ) )
delete ( availableNodes , nodeID )
continue
}
queryNodeInfo := nodeInfo . ( * queryNode )
// avoid allocate segment to node which memUsageRate is high
2021-11-12 18:49:10 +08:00
if queryNodeInfo . memUsageRate >= Params . OverloadedMemoryThresholdPercentage {
2021-11-11 12:56:42 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode" , zap . Int64 ( "nodeID" , nodeID ) , zap . Float64 ( "current rate" , queryNodeInfo . memUsageRate ) )
delete ( availableNodes , nodeID )
continue
}
// update totalMem, memUsage, memUsageRate
totalMem [ nodeID ] , memUsage [ nodeID ] , memUsageRate [ nodeID ] = queryNodeInfo . totalMem , queryNodeInfo . memUsage , queryNodeInfo . memUsageRate
}
2021-11-18 19:35:15 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: num of availableNodes" , zap . Int ( "size" , len ( availableNodes ) ) )
2021-11-11 12:56:42 +08:00
if len ( availableNodes ) > 0 {
nodeIDSlice := make ( [ ] int64 , 0 , len ( availableNodes ) )
for nodeID := range availableNodes {
nodeIDSlice = append ( nodeIDSlice , nodeID )
}
allocateSegmentsDone := true
for offset , sizeOfReq := range dataSizePerReq {
// sort nodes by memUsageRate, low to high
sort . Slice ( nodeIDSlice , func ( i , j int ) bool {
return memUsageRate [ nodeIDSlice [ i ] ] < memUsageRate [ nodeIDSlice [ j ] ]
} )
findNodeToAllocate := false
// assign load segment request to query node which has least memUsageRate
for _ , nodeID := range nodeIDSlice {
memUsageAfterLoad := memUsage [ nodeID ] + uint64 ( sizeOfReq )
memUsageRateAfterLoad := float64 ( memUsageAfterLoad ) / float64 ( totalMem [ nodeID ] )
2021-11-12 18:49:10 +08:00
if memUsageRateAfterLoad > Params . OverloadedMemoryThresholdPercentage {
2021-11-11 12:56:42 +08:00
continue
}
reqs [ offset ] . DstNodeID = nodeID
memUsage [ nodeID ] = memUsageAfterLoad
memUsageRate [ nodeID ] = memUsageRateAfterLoad
findNodeToAllocate = true
break
}
// the load segment request can't be allocated to any query node
if ! findNodeToAllocate {
allocateSegmentsDone = false
break
}
}
if allocateSegmentsDone {
2021-11-18 19:35:15 +08:00
log . Debug ( "shuffleSegmentsToQueryNodeV2: shuffle segment to query node success" )
2021-11-11 12:56:42 +08:00
return nil
}
}
if wait {
time . Sleep ( 1 * time . Second )
continue
} else {
return errors . New ( "no queryNode to allocate" )
}
}
}
2021-11-12 18:49:10 +08:00
func nodeIncluded ( nodeID int64 , includeNodeIDs [ ] int64 ) bool {
for _ , id := range includeNodeIDs {
if id == nodeID {
return true
}
}
return false
}