mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
fix sync problem during reblance (#18332)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
be6ca6358d
commit
a79c5e2366
@ -369,7 +369,7 @@ func (qn *queryNode) getNodeInfo() (Node, error) {
|
||||
|
||||
func (qn *queryNode) syncReplicaSegments(ctx context.Context, in *querypb.SyncReplicaSegmentsRequest) error {
|
||||
if !qn.isOnline() {
|
||||
return errors.New("ReleaseSegments: queryNode is offline")
|
||||
return errors.New("SyncSegments: queryNode is offline")
|
||||
}
|
||||
|
||||
status, err := qn.client.SyncReplicaSegments(ctx, in)
|
||||
|
@ -2407,19 +2407,6 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
|
||||
zap.Int("segmentNum", len(segments)))
|
||||
|
||||
wg := errgroup.Group{}
|
||||
// Remove offline nodes from replica
|
||||
for replicaID := range replicas {
|
||||
replicaID := replicaID
|
||||
wg.Go(func() error {
|
||||
log.Debug("remove offline nodes from replica",
|
||||
zap.Int64("taskID", lbt.taskID),
|
||||
zap.Int64("replicaID", replicaID),
|
||||
zap.Int64s("offlineNodes", lbt.SourceNodeIDs))
|
||||
|
||||
return lbt.meta.applyReplicaBalancePlan(
|
||||
NewRemoveBalancePlan(replicaID, lbt.SourceNodeIDs...))
|
||||
})
|
||||
}
|
||||
|
||||
// Remove offline nodes from dmChannels
|
||||
for _, dmChannel := range dmChannels {
|
||||
@ -2439,7 +2426,7 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
|
||||
log.Info("remove offline nodes from dmChannel",
|
||||
zap.Int64("taskID", lbt.getTaskID()),
|
||||
zap.String("dmChannel", dmChannel.DmChannel),
|
||||
zap.Int64s("nodeIds", dmChannel.NodeIds))
|
||||
zap.Int64s("left nodeIds", dmChannel.NodeIds))
|
||||
|
||||
return nil
|
||||
})
|
||||
@ -2485,11 +2472,11 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
err := wg.Wait()
|
||||
if err != nil {
|
||||
if err := wg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// sync segment
|
||||
for replicaID := range replicas {
|
||||
err := syncReplicaSegments(lbt.ctx, lbt.meta, lbt.cluster, replicaID)
|
||||
if err != nil {
|
||||
@ -2501,6 +2488,24 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove offline nodes from replica
|
||||
for replicaID := range replicas {
|
||||
replicaID := replicaID
|
||||
wg.Go(func() error {
|
||||
log.Debug("remove offline nodes from replica",
|
||||
zap.Int64("taskID", lbt.taskID),
|
||||
zap.Int64("replicaID", replicaID),
|
||||
zap.Int64s("offlineNodes", lbt.SourceNodeIDs))
|
||||
|
||||
return lbt.meta.applyReplicaBalancePlan(
|
||||
NewRemoveBalancePlan(replicaID, lbt.SourceNodeIDs...))
|
||||
})
|
||||
}
|
||||
|
||||
if err := wg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, offlineNodeID := range lbt.SourceNodeIDs {
|
||||
err := lbt.cluster.RemoveNodeInfo(offlineNodeID)
|
||||
if err != nil {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
@ -179,7 +180,7 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string,
|
||||
func (sc *ShardCluster) Close() {
|
||||
log.Info("Close shard cluster")
|
||||
sc.closeOnce.Do(func() {
|
||||
sc.state.Store(int32(unavailable))
|
||||
sc.updateShardClusterState(unavailable)
|
||||
close(sc.closeCh)
|
||||
})
|
||||
}
|
||||
@ -242,7 +243,7 @@ func (sc *ShardCluster) removeNode(evt nodeEvent) {
|
||||
if segment.nodeID == evt.nodeID {
|
||||
segment.state = segmentStateOffline
|
||||
sc.segments[id] = segment
|
||||
sc.state.Store(int32(unavailable))
|
||||
sc.updateShardClusterState(unavailable)
|
||||
}
|
||||
}
|
||||
// ignore leader process here
|
||||
@ -424,15 +425,28 @@ func (sc *ShardCluster) selectNodeInReplica(nodeIDs []int64) (int64, bool) {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (sc *ShardCluster) updateShardClusterState(state shardClusterState) {
|
||||
old := sc.state.Load()
|
||||
sc.state.Store(int32(state))
|
||||
|
||||
pc, _, _, _ := runtime.Caller(1)
|
||||
callerName := runtime.FuncForPC(pc).Name()
|
||||
|
||||
log.Info("Shard Cluster update state", zap.Int64("collectionID", sc.collectionID),
|
||||
zap.Int64("replicaID", sc.replicaID), zap.String("channel", sc.vchannelName),
|
||||
zap.Int32("old state", old), zap.Int32("new state", int32(state)),
|
||||
zap.String("caller", callerName))
|
||||
}
|
||||
|
||||
// healthCheck iterate all segments to to check cluster could provide service.
|
||||
func (sc *ShardCluster) healthCheck() {
|
||||
for _, segment := range sc.segments {
|
||||
if segment.state != segmentStateLoaded { // TODO check hand-off or load balance
|
||||
sc.state.Store(int32(unavailable))
|
||||
sc.updateShardClusterState(unavailable)
|
||||
return
|
||||
}
|
||||
}
|
||||
sc.state.Store(int32(available))
|
||||
sc.updateShardClusterState(available)
|
||||
}
|
||||
|
||||
// watchNodes handles node events.
|
||||
|
Loading…
Reference in New Issue
Block a user