Add new node into the replica which has the most of offline nodes (#16907)

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2022-05-13 18:31:54 +08:00 committed by GitHub
parent 52fc9a558c
commit a382133a8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 11 deletions

View File

@ -720,10 +720,7 @@ func (c *queryNodeCluster) assignNodesToReplicas(ctx context.Context, replicas [
return fmt.Errorf("no enough nodes to create replicas, node_num=%d replica_num=%d", len(nodeIds), len(replicas))
}
nodeInfos, err := getNodeInfos(c, nodeIds)
if err != nil {
return err
}
nodeInfos := getNodeInfos(c, nodeIds)
if len(nodeInfos) < len(replicas) {
return fmt.Errorf("no enough nodes to create replicas, node_num=%d replica_num=%d", len(nodeInfos), len(replicas))
}
@ -758,7 +755,7 @@ func (c *queryNodeCluster) assignNodesToReplicas(ctx context.Context, replicas [
// It's a helper method to concurrently get nodes' info
// Remove nodes that it can't connect to
func getNodeInfos(cluster *queryNodeCluster, nodeIds []UniqueID) ([]*queryNode, error) {
func getNodeInfos(cluster Cluster, nodeIds []UniqueID) []*queryNode {
nodeCh := make(chan *queryNode, len(nodeIds))
wg := sync.WaitGroup{}
@ -781,5 +778,5 @@ func getNodeInfos(cluster *queryNodeCluster, nodeIds []UniqueID) ([]*queryNode,
nodes = append(nodes, node)
}
return nodes, nil
return nodes
}

View File

@ -15,11 +15,12 @@ type balancePlan struct {
}
type replicaBalancer struct {
meta Meta
meta Meta
cluster Cluster
}
func newReplicaBalancer(meta Meta) *replicaBalancer {
return &replicaBalancer{meta}
func newReplicaBalancer(meta Meta, cluster Cluster) *replicaBalancer {
return &replicaBalancer{meta, cluster}
}
func (b *replicaBalancer) addNode(nodeID int64) ([]*balancePlan, error) {
@ -34,8 +35,26 @@ func (b *replicaBalancer) addNode(nodeID int64) ([]*balancePlan, error) {
if len(replicas) == 0 {
continue
}
offlineNodesCnt := make(map[UniqueID]int, len(replicas))
replicaAvailableMemory := make(map[UniqueID]uint64, len(replicas))
for _, replica := range replicas {
for _, nodeID := range replica.NodeIds {
if isOnline, err := b.cluster.isOnline(nodeID); err != nil || !isOnline {
offlineNodesCnt[replica.ReplicaID]++
}
}
replicaAvailableMemory[replica.ReplicaID] = getReplicaAvailableMemory(b.cluster, replica)
}
sort.Slice(replicas, func(i, j int) bool {
return len(replicas[i].GetNodeIds()) < len(replicas[j].GetNodeIds())
replicai := replicas[i].ReplicaID
replicaj := replicas[j].ReplicaID
cnti := offlineNodesCnt[replicai]
cntj := offlineNodesCnt[replicaj]
return cnti > cntj ||
cnti == cntj && replicaAvailableMemory[replicai] < replicaAvailableMemory[replicaj]
})
ret = append(ret, &balancePlan{

View File

@ -169,7 +169,6 @@ func (qc *QueryCoord) Init() error {
log.Error("query coordinator init meta failed", zap.Error(initError))
return
}
qc.groupBalancer = newReplicaBalancer(qc.meta)
// init channelUnsubscribeHandler
qc.handler, initError = newChannelUnsubscribeHandler(qc.loopCtx, qc.kvClient, qc.factory)
@ -185,6 +184,8 @@ func (qc *QueryCoord) Init() error {
return
}
qc.groupBalancer = newReplicaBalancer(qc.meta, qc.cluster)
// NOTE: ignore the returned error
// we only try best to reload the leader addresses
reloadShardLeaderAddress(qc.meta, qc.cluster)

View File

@ -21,6 +21,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -207,3 +208,13 @@ func removeFromSlice(origin []UniqueID, del ...UniqueID) []UniqueID {
return set.Collect()
}
func getReplicaAvailableMemory(cluster Cluster, replica *milvuspb.ReplicaInfo) uint64 {
availableMemory := uint64(0)
nodes := getNodeInfos(cluster, replica.NodeIds)
for _, node := range nodes {
availableMemory += node.totalMem - node.memUsage
}
return availableMemory
}