From a2d2ad88bde13a8162923b56288acdb6cee02801 Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 6 Jun 2022 16:52:05 +0800 Subject: [PATCH] Make assigning segments faster (#17377) This improve the Load performance, and let the LoadBalance fails fast, which allows us to retry it timely Signed-off-by: yah01 --- internal/querycoord/querynode.go | 6 ++- internal/querycoord/segment_allocator.go | 50 ++++++++++++++---------- internal/querycoord/task_scheduler.go | 1 + 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/internal/querycoord/querynode.go b/internal/querycoord/querynode.go index 3721fa2705..7389958abc 100644 --- a/internal/querycoord/querynode.go +++ b/internal/querycoord/querynode.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "time" "go.uber.org/zap" @@ -335,7 +336,10 @@ func (qn *queryNode) getNodeInfo() (Node, error) { if err != nil { return nil, err } - resp, err := qn.client.GetMetrics(qn.ctx, req) + + ctx, cancel := context.WithTimeout(qn.ctx, time.Second) + defer cancel() + resp, err := qn.client.GetMetrics(ctx, req) if err != nil { return nil, err } diff --git a/internal/querycoord/segment_allocator.go b/internal/querycoord/segment_allocator.go index ab34e8b561..18129979f2 100644 --- a/internal/querycoord/segment_allocator.go +++ b/internal/querycoord/segment_allocator.go @@ -135,34 +135,22 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme log.Error("shuffleSegmentsToQueryNode failed", zap.Error(err)) return err } + onlineNodeIDs = nodesFilter(onlineNodeIDs, includeNodeIDs, excludeNodeIDs) var availableNodeIDs []int64 - for _, nodeID := range onlineNodeIDs { - // nodeID not in includeNodeIDs - if len(includeNodeIDs) > 0 && !nodeIncluded(nodeID, includeNodeIDs) { - continue - } - - // nodeID in excludeNodeIDs - if nodeIncluded(nodeID, excludeNodeIDs) { - continue - } - // statistic nodeInfo, used memory, memory usage of every query node - nodeInfo, err := cluster.GetNodeInfoByID(nodeID) - if err != nil { - log.Warn("shuffleSegmentsToQueryNodeV2: getNodeInfoByID failed", zap.Error(err)) - continue - } - queryNodeInfo := nodeInfo.(*queryNode) + nodes := getNodeInfos(cluster, onlineNodeIDs) + for _, nodeInfo := range nodes { // avoid allocate segment to node which memUsageRate is high - if queryNodeInfo.memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage { - log.Info("shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode", zap.Int64("nodeID", nodeID), zap.Float64("current rate", queryNodeInfo.memUsageRate)) + if nodeInfo.memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage { + log.Info("shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode", + zap.Int64("nodeID", nodeInfo.id), + zap.Float64("memoryUsageRate", nodeInfo.memUsageRate)) continue } // update totalMem, memUsage, memUsageRate - totalMem[nodeID], memUsage[nodeID], memUsageRate[nodeID] = queryNodeInfo.totalMem, queryNodeInfo.memUsage, queryNodeInfo.memUsageRate - availableNodeIDs = append(availableNodeIDs, nodeID) + totalMem[nodeInfo.id], memUsage[nodeInfo.id], memUsageRate[nodeInfo.id] = nodeInfo.totalMem, nodeInfo.memUsage, nodeInfo.memUsageRate + availableNodeIDs = append(availableNodeIDs, nodeInfo.id) } if len(availableNodeIDs) > 0 { log.Info("shuffleSegmentsToQueryNodeV2: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs)) @@ -227,3 +215,23 @@ func nodeIncluded(nodeID int64, includeNodeIDs []int64) bool { return false } + +func nodesFilter(nodes []UniqueID, include []UniqueID, exclude []UniqueID) []UniqueID { + result := make([]UniqueID, 0) + + for _, node := range nodes { + // nodeID not in includeNodeIDs + if len(include) > 0 && !nodeIncluded(node, include) { + continue + } + + // nodeID in excludeNodeIDs + if nodeIncluded(node, exclude) { + continue + } + + result = append(result, node) + } + + return result +} diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 452b93f434..bef30ab997 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -392,6 +392,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, } // if triggerCondition == nodeDown, and the queryNode resources are insufficient, // queryCoord will waits until queryNode can load the data, ensuring that the data is not lost + baseTask = newBaseTaskWithRetry(scheduler.ctx, loadReq.BalanceReason, 0) baseTask.setTriggerCondition(loadReq.BalanceReason) loadBalanceTask := &loadBalanceTask{ baseTask: baseTask,