Modify the replicas' shard info after load balance (#16785)

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2022-05-05 21:15:50 +08:00 committed by GitHub
parent 5922f147e5
commit c82e2453eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -675,8 +675,6 @@ func (rct *releaseCollectionTask) preExecute(context.Context) error {
}
func (rct *releaseCollectionTask) execute(ctx context.Context) error {
// cancel the maximum number of retries for queryNode cleaning data until the data is completely freed
// defer rct.reduceRetryCount()
collectionID := rct.CollectionID
// if nodeID ==0, it means that the release request has not been assigned to the specified query node
@ -707,6 +705,8 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
log.Info("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask))
}
} else {
// If the node crashed or be offline, the loaded segments are lost
defer rct.reduceRetryCount()
err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest)
if err != nil {
log.Warn("releaseCollectionTask: release collection end, node occur error", zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID))
@ -1126,8 +1126,6 @@ func (rpt *releasePartitionTask) preExecute(context.Context) error {
}
func (rpt *releasePartitionTask) execute(ctx context.Context) error {
// cancel the maximum number of retries for queryNode cleaning data until the data is completely freed
// defer rpt.reduceRetryCount()
collectionID := rpt.CollectionID
partitionIDs := rpt.PartitionIDs
@ -1149,6 +1147,8 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error {
log.Info("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID))
}
} else {
// If the node crashed or be offline, the loaded segments are lost
defer rpt.reduceRetryCount()
err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest)
if err != nil {
log.Warn("ReleasePartitionsTask: release partition end, node occur error", zap.Int64("collectionID", collectionID), zap.String("nodeID", fmt.Sprintln(rpt.NodeID)))
@ -2235,12 +2235,6 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
// then the queryCoord will panic, and the nodeInfo should not be removed immediately
// after queryCoord recovery, the balanceTask will redo
if lbt.triggerCondition == querypb.TriggerCondition_NodeDown && lbt.getResultInfo().ErrorCode == commonpb.ErrorCode_Success {
offlineNodes := make(map[UniqueID]struct{}, len(lbt.SourceNodeIDs))
for _, nodeID := range lbt.SourceNodeIDs {
offlineNodes[nodeID] = struct{}{}
}
replicas := make(map[UniqueID]*milvuspb.ReplicaInfo)
for _, id := range lbt.SourceNodeIDs {
err := lbt.cluster.removeNodeInfo(id)
if err != nil {
@ -2250,43 +2244,7 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
zap.Error(err))
continue
}
replica, err := lbt.getReplica(id, lbt.CollectionID)
if err != nil {
log.Warn("failed to get replica for removing offline querynode from it",
zap.Int64("querynodeID", id),
zap.Int64("collectionID", lbt.CollectionID),
zap.Error(err))
continue
}
replicas[replica.ReplicaID] = replica
}
log.Debug("removing offline nodes from replicas...",
zap.Int("len(replicas)", len(replicas)))
wg := sync.WaitGroup{}
for _, replica := range replicas {
wg.Add(1)
go func(replica *milvuspb.ReplicaInfo) {
defer wg.Done()
onlineNodes := make([]UniqueID, 0, len(replica.NodeIds))
for _, nodeID := range replica.NodeIds {
if _, ok := offlineNodes[nodeID]; !ok {
onlineNodes = append(onlineNodes, nodeID)
}
}
replica.NodeIds = onlineNodes
err := lbt.meta.setReplicaInfo(replica)
if err != nil {
log.Warn("failed to remove offline nodes from replica info",
zap.Int64("replicaID", replica.ReplicaID),
zap.Error(err))
}
}(replica)
}
wg.Wait()
}
log.Info("loadBalanceTask postExecute done",
@ -2299,8 +2257,79 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
if len(lbt.getChildTask()) > 0 {
return syncReplicaSegments(ctx, lbt.cluster, lbt.getChildTask())
if lbt.triggerCondition == querypb.TriggerCondition_NodeDown {
offlineNodes := make(typeutil.UniqueSet, len(lbt.SourceNodeIDs))
for _, nodeID := range lbt.SourceNodeIDs {
offlineNodes.Insert(nodeID)
}
replicas := make(map[UniqueID]*milvuspb.ReplicaInfo)
for _, id := range lbt.SourceNodeIDs {
replica, err := lbt.getReplica(id, lbt.CollectionID)
if err != nil {
log.Warn("failed to get replica for removing offline querynode from it",
zap.Int64("querynodeID", id),
zap.Int64("collectionID", lbt.CollectionID),
zap.Error(err))
continue
}
replicas[replica.ReplicaID] = replica
}
log.Debug("removing offline nodes from replicas...",
zap.Int("len(replicas)", len(replicas)))
wg := sync.WaitGroup{}
for _, replica := range replicas {
wg.Add(1)
go func(replica *milvuspb.ReplicaInfo) {
defer wg.Done()
onlineNodes := make([]UniqueID, 0, len(replica.NodeIds))
for _, nodeID := range replica.NodeIds {
if !offlineNodes.Contain(nodeID) {
onlineNodes = append(onlineNodes, nodeID)
}
}
replica.NodeIds = onlineNodes
err := lbt.meta.setReplicaInfo(replica)
if err != nil {
log.Warn("failed to remove offline nodes from replica info",
zap.Int64("replicaID", replica.ReplicaID),
zap.Error(err))
}
}(replica)
}
wg.Wait()
}
err := syncReplicaSegments(ctx, lbt.cluster, lbt.getChildTask())
if err != nil {
return err
}
for _, childTask := range lbt.getChildTask() {
if task, ok := childTask.(*watchDmChannelTask); ok {
nodeInfo, err := lbt.cluster.getNodeInfoByID(task.NodeID)
if err != nil {
return err
}
replica, err := lbt.meta.getReplicaByID(task.ReplicaID)
if err != nil {
return err
}
for _, shard := range replica.ShardReplicas {
if shard.DmChannelName == task.Infos[0].ChannelName {
shard.LeaderID = task.NodeID
shard.LeaderAddr = nodeInfo.(*queryNode).address
}
}
}
}
}
return nil
}