mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
Make assigning segments faster (#17377)
This improve the Load performance, and let the LoadBalance fails fast, which allows us to retry it timely Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
ac6394d0fd
commit
a2d2ad88bd
@ -21,6 +21,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
@ -335,7 +336,10 @@ func (qn *queryNode) getNodeInfo() (Node, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := qn.client.GetMetrics(qn.ctx, req)
|
|
||||||
|
ctx, cancel := context.WithTimeout(qn.ctx, time.Second)
|
||||||
|
defer cancel()
|
||||||
|
resp, err := qn.client.GetMetrics(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -135,34 +135,22 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
|
|||||||
log.Error("shuffleSegmentsToQueryNode failed", zap.Error(err))
|
log.Error("shuffleSegmentsToQueryNode failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
onlineNodeIDs = nodesFilter(onlineNodeIDs, includeNodeIDs, excludeNodeIDs)
|
||||||
|
|
||||||
var availableNodeIDs []int64
|
var availableNodeIDs []int64
|
||||||
for _, nodeID := range onlineNodeIDs {
|
nodes := getNodeInfos(cluster, onlineNodeIDs)
|
||||||
// nodeID not in includeNodeIDs
|
for _, nodeInfo := range nodes {
|
||||||
if len(includeNodeIDs) > 0 && !nodeIncluded(nodeID, includeNodeIDs) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// nodeID in excludeNodeIDs
|
|
||||||
if nodeIncluded(nodeID, excludeNodeIDs) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// statistic nodeInfo, used memory, memory usage of every query node
|
|
||||||
nodeInfo, err := cluster.GetNodeInfoByID(nodeID)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("shuffleSegmentsToQueryNodeV2: getNodeInfoByID failed", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
queryNodeInfo := nodeInfo.(*queryNode)
|
|
||||||
// avoid allocate segment to node which memUsageRate is high
|
// avoid allocate segment to node which memUsageRate is high
|
||||||
if queryNodeInfo.memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage {
|
if nodeInfo.memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage {
|
||||||
log.Info("shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode", zap.Int64("nodeID", nodeID), zap.Float64("current rate", queryNodeInfo.memUsageRate))
|
log.Info("shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode",
|
||||||
|
zap.Int64("nodeID", nodeInfo.id),
|
||||||
|
zap.Float64("memoryUsageRate", nodeInfo.memUsageRate))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// update totalMem, memUsage, memUsageRate
|
// update totalMem, memUsage, memUsageRate
|
||||||
totalMem[nodeID], memUsage[nodeID], memUsageRate[nodeID] = queryNodeInfo.totalMem, queryNodeInfo.memUsage, queryNodeInfo.memUsageRate
|
totalMem[nodeInfo.id], memUsage[nodeInfo.id], memUsageRate[nodeInfo.id] = nodeInfo.totalMem, nodeInfo.memUsage, nodeInfo.memUsageRate
|
||||||
availableNodeIDs = append(availableNodeIDs, nodeID)
|
availableNodeIDs = append(availableNodeIDs, nodeInfo.id)
|
||||||
}
|
}
|
||||||
if len(availableNodeIDs) > 0 {
|
if len(availableNodeIDs) > 0 {
|
||||||
log.Info("shuffleSegmentsToQueryNodeV2: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs))
|
log.Info("shuffleSegmentsToQueryNodeV2: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs))
|
||||||
@ -227,3 +215,23 @@ func nodeIncluded(nodeID int64, includeNodeIDs []int64) bool {
|
|||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func nodesFilter(nodes []UniqueID, include []UniqueID, exclude []UniqueID) []UniqueID {
|
||||||
|
result := make([]UniqueID, 0)
|
||||||
|
|
||||||
|
for _, node := range nodes {
|
||||||
|
// nodeID not in includeNodeIDs
|
||||||
|
if len(include) > 0 && !nodeIncluded(node, include) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodeID in excludeNodeIDs
|
||||||
|
if nodeIncluded(node, exclude) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
@ -392,6 +392,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
|
|||||||
}
|
}
|
||||||
// if triggerCondition == nodeDown, and the queryNode resources are insufficient,
|
// if triggerCondition == nodeDown, and the queryNode resources are insufficient,
|
||||||
// queryCoord will waits until queryNode can load the data, ensuring that the data is not lost
|
// queryCoord will waits until queryNode can load the data, ensuring that the data is not lost
|
||||||
|
baseTask = newBaseTaskWithRetry(scheduler.ctx, loadReq.BalanceReason, 0)
|
||||||
baseTask.setTriggerCondition(loadReq.BalanceReason)
|
baseTask.setTriggerCondition(loadReq.BalanceReason)
|
||||||
loadBalanceTask := &loadBalanceTask{
|
loadBalanceTask := &loadBalanceTask{
|
||||||
baseTask: baseTask,
|
baseTask: baseTask,
|
||||||
|
Loading…
Reference in New Issue
Block a user