support replica observer assign node (#22604)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-03-08 18:57:51 +08:00 committed by GitHub
parent efdc77bce6
commit 11f1f4226a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 95 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
)
// check replica, find outbound nodes and remove it from replica if all segment/channel has been moved
@ -82,14 +83,15 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) {
func (ob *ReplicaObserver) checkNodesInReplica() {
collections := ob.meta.GetAll()
for _, collectionID := range collections {
removedNodes := make([]int64, 0)
// remove nodes from replica which has been transferred to other rg
replicas := ob.meta.ReplicaManager.GetByCollection(collectionID)
for _, replica := range replicas {
outboundNodes := ob.meta.ResourceManager.CheckOutboundNodes(replica)
if len(outboundNodes) > 0 {
log.RatedInfo(10, "found outbound nodes in replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64s("allOutboundNodes", outboundNodes.Collect()),
)
@ -99,9 +101,10 @@ func (ob *ReplicaObserver) checkNodesInReplica() {
if len(channels) == 0 && len(segments) == 0 {
replica.RemoveNode(node)
removedNodes = append(removedNodes, node)
log.Info("all segment/channel has been removed from outbound node, remove it from replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64("removedNodes", node),
zap.Int64s("availableNodes", replica.GetNodes()),
)
@ -109,5 +112,17 @@ func (ob *ReplicaObserver) checkNodesInReplica() {
}
}
}
// assign removed nodes to other replicas in current rg
for _, node := range removedNodes {
rg, err := ob.meta.ResourceManager.FindResourceGroupByNode(node)
if err != nil {
// unreachable logic path
log.Warn("found node which does not belong to any resource group", zap.Int64("nodeID", node))
continue
}
replicas := ob.meta.ReplicaManager.GetByCollectionAndRG(collectionID, rg)
utils.AddNodesToReplicas(ob.meta, replicas, node)
}
}
}

View File

@ -21,12 +21,14 @@ import (
"time"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/suite"
)
@ -38,6 +40,7 @@ type ReplicaObserverSuite struct {
meta *meta.Meta
distMgr *meta.DistributionManager
nodeMgr *session.NodeManager
observer *ReplicaObserver
collectionID int64
@ -66,62 +69,77 @@ func (suite *ReplicaObserverSuite) SetupTest() {
// meta
store := meta.NewMetaStore(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager())
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.distMgr = meta.NewDistributionManager()
suite.observer = NewReplicaObserver(suite.meta, suite.distMgr)
suite.observer.Start(context.TODO())
suite.collectionID = int64(1000)
suite.partitionID = int64(100)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1))
suite.NoError(err)
replicas, err := suite.meta.ReplicaManager.Spawn(suite.collectionID, 1, meta.DefaultResourceGroupName)
suite.NoError(err)
err = suite.meta.ReplicaManager.Put(replicas...)
suite.NoError(err)
}
func (suite *ReplicaObserverSuite) TestCheckNodesInReplica() {
replicas := suite.meta.ReplicaManager.GetByCollection(suite.collectionID)
suite.meta.ResourceManager.AddResourceGroup("rg1")
suite.meta.ResourceManager.AddResourceGroup("rg2")
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost:8080"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost:8080"))
suite.nodeMgr.Add(session.NewNodeInfo(3, "localhost:8080"))
suite.nodeMgr.Add(session.NewNodeInfo(4, "localhost:8080"))
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg1", 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg1", 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 3)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg2", 1)
suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 4)
suite.meta.ResourceManager.TransferNode(meta.DefaultResourceGroupName, "rg2", 1)
suite.distMgr.ChannelDistManager.Update(1, utils.CreateTestChannel(suite.collectionID, 2, 1, "test-insert-channel1"))
suite.distMgr.SegmentDistManager.Update(1, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 1, 100, 1, "test-insert-channel1"))
replicas[0].AddNode(1)
suite.distMgr.ChannelDistManager.Update(100, utils.CreateTestChannel(suite.collectionID, 100, 1, "test-insert-channel2"))
suite.distMgr.SegmentDistManager.Update(100, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 2, 100, 1, "test-insert-channel2"))
replicas[0].AddNode(100)
err := suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1))
suite.NoError(err)
replicas := make([]*meta.Replica, 2)
replicas[0] = meta.NewReplica(
&querypb.Replica{
ID: 10000,
CollectionID: suite.collectionID,
ResourceGroup: "rg1",
Nodes: []int64{1, 2, 3},
},
typeutil.NewUniqueSet(1, 2, 3),
)
replicas[1] = meta.NewReplica(
&querypb.Replica{
ID: 10001,
CollectionID: suite.collectionID,
ResourceGroup: "rg2",
Nodes: []int64{4},
},
typeutil.NewUniqueSet(4),
)
err = suite.meta.ReplicaManager.Put(replicas...)
suite.NoError(err)
suite.distMgr.ChannelDistManager.Update(1, utils.CreateTestChannel(suite.collectionID, 1, 1, "test-insert-channel1"))
suite.distMgr.SegmentDistManager.Update(1, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 1, 1, 1, "test-insert-channel1"))
suite.distMgr.ChannelDistManager.Update(2, utils.CreateTestChannel(suite.collectionID, 2, 1, "test-insert-channel2"))
suite.distMgr.SegmentDistManager.Update(2, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 2, 2, 1, "test-insert-channel2"))
suite.distMgr.ChannelDistManager.Update(3, utils.CreateTestChannel(suite.collectionID, 3, 1, "test-insert-channel3"))
suite.distMgr.SegmentDistManager.Update(3, utils.CreateTestSegment(suite.collectionID, suite.partitionID, 2, 3, 1, "test-insert-channel3"))
suite.Eventually(func() bool {
// node 100 should be kept
replicas := suite.meta.ReplicaManager.GetByCollection(suite.collectionID)
for _, node := range replicas[0].GetNodes() {
if node == 100 {
return true
}
}
return false
replica0 := suite.meta.ReplicaManager.Get(10000)
replica1 := suite.meta.ReplicaManager.Get(10001)
return suite.Contains(replica0.GetNodes(), int64(3)) && suite.NotContains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 1)
}, 6*time.Second, 2*time.Second)
suite.Len(replicas[0].GetNodes(), 2)
suite.distMgr.ChannelDistManager.Update(100)
suite.distMgr.SegmentDistManager.Update(100)
suite.distMgr.ChannelDistManager.Update(3)
suite.distMgr.SegmentDistManager.Update(3)
suite.Eventually(func() bool {
// node 100 should be removed
replicas := suite.meta.ReplicaManager.GetByCollection(suite.collectionID)
for _, node := range replicas[0].GetNodes() {
if node == 100 {
return false
}
}
return true
}, 5*time.Second, 1*time.Second)
suite.Len(replicas[0].GetNodes(), 1)
suite.Equal([]int64{1}, replicas[0].GetNodes())
replica0 := suite.meta.ReplicaManager.Get(10000)
replica1 := suite.meta.ReplicaManager.Get(10001)
return suite.NotContains(replica0.GetNodes(), int64(3)) && suite.Contains(replica1.GetNodes(), int64(3)) && suite.Len(replica1.GetNodes(), 2)
}, 6*time.Second, 2*time.Second)
}
func (suite *ReplicaObserverSuite) TearDownSuite() {

View File

@ -1095,11 +1095,16 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
}
replicas := s.meta.ReplicaManager.GetByCollection(req.GetCollectionID())
if (req.GetSourceResourceGroup() == meta.DefaultResourceGroupName || req.GetTargetResourceGroup() == meta.DefaultResourceGroupName) &&
len(replicas) != int(req.GetNumReplica()) {
if req.GetNumReplica() <= 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
"transfer replica will cause replica loaded in both default rg and other rg", nil), nil
fmt.Sprintf("transfer replica num can't be [%d]", req.GetNumReplica()), nil), nil
}
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup())
if len(replicas) < int(req.GetNumReplica()) {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("only found [%d] replicas in source resource group[%s]",
len(replicas), req.GetSourceResourceGroup())), nil
}
replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup())
@ -1109,17 +1114,11 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
len(replicas), req.GetTargetResourceGroup())), nil
}
if req.GetNumReplica() <= 0 {
replicas = s.meta.ReplicaManager.GetByCollection(req.GetCollectionID())
if (req.GetSourceResourceGroup() == meta.DefaultResourceGroupName || req.GetTargetResourceGroup() == meta.DefaultResourceGroupName) &&
len(replicas) != int(req.GetNumReplica()) {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("transfer replica num can't be [%d]", req.GetNumReplica()), nil), nil
}
// for now, we don't support to transfer replica of same collection to same resource group
replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup())
if len(replicas) < int(req.GetNumReplica()) {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("only found [%d] replicas in source resource group[%s]",
len(replicas), req.GetSourceResourceGroup())), nil
"transfer replica will cause replica loaded in both default rg and other rg", nil), nil
}
err := s.transferReplica(req.GetTargetResourceGroup(), replicas[:req.GetNumReplica()])

View File

@ -608,15 +608,6 @@ func (suite *ServiceSuite) TestTransferReplica() {
suite.NoError(err)
suite.Contains(resp.Reason, "only found [0] replicas in source resource group")
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg1",
CollectionID: 1,
NumReplica: 2,
})
suite.NoError(err)
suite.Contains(resp.Reason, "transfer replica will cause replica loaded in both default rg and other rg")
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: "rgg",
TargetResourceGroup: meta.DefaultResourceGroupName,
@ -671,6 +662,34 @@ func (suite *ServiceSuite) TestTransferReplica() {
suite.server.meta.AssignNode("rg3", 1004)
suite.server.meta.AssignNode("rg3", 1005)
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
CollectionID: 2,
ID: 444,
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(3)))
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
CollectionID: 2,
ID: 555,
ResourceGroup: "rg2",
}, typeutil.NewUniqueSet(4)))
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg2",
CollectionID: 2,
NumReplica: 1,
})
suite.NoError(err)
suite.Contains(resp.Reason, "dynamically increase replica num is unsupported")
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg1",
CollectionID: 1,
NumReplica: 1,
})
suite.NoError(err)
suite.Contains(resp.Reason, "transfer replica will cause replica loaded in both default rg and other rg")
replicaNum := len(suite.server.meta.ReplicaManager.GetByCollection(1))
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
@ -678,18 +697,9 @@ func (suite *ServiceSuite) TestTransferReplica() {
CollectionID: 1,
NumReplica: int64(replicaNum),
})
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
suite.Len(suite.server.meta.GetByResourceGroup("rg3"), 3)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg3",
CollectionID: 1,
NumReplica: int64(replicaNum),
})
suite.NoError(err)
suite.Contains(resp.Reason, "dynamically increase replica num is unsupported")
// server unhealthy
server.status.Store(commonpb.StateCode_Abnormal)

View File

@ -153,34 +153,41 @@ func AssignNodesToReplicas(m *meta.Meta, rgName string, replicas ...*meta.Replic
func AddNodesToCollectionsInRG(m *meta.Meta, rgName string, nodes ...int64) {
for _, node := range nodes {
for _, collection := range m.CollectionManager.GetAll() {
log := log.With(zap.Int64("collectionID", collection))
replica := m.ReplicaManager.GetByCollectionAndNode(collection, node)
if replica == nil {
replicas := m.ReplicaManager.GetByCollectionAndRG(collection, rgName)
if len(replicas) == 0 {
continue
}
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].Len() < replicas[j].Len()
})
replica := replicas[0]
// TODO(yah01): this may fail, need a component to check whether a node is assigned
err := m.ReplicaManager.AddNode(replica.GetID(), node)
if err != nil {
log.Warn("failed to assign node to replicas",
zap.Int64("replicaID", replica.GetID()),
zap.Int64("nodeId", node),
zap.Error(err),
)
continue
}
log.Info("assign node to replica",
zap.Int64("replicaID", replica.GetID()))
AddNodesToReplicas(m, replicas, node)
}
}
}
}
func AddNodesToReplicas(m *meta.Meta, replicas []*meta.Replica, node int64) {
if len(replicas) == 0 {
return
}
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].Len() < replicas[j].Len()
})
replica := replicas[0]
// TODO(yah01): this may fail, need a component to check whether a node is assigned
err := m.ReplicaManager.AddNode(replica.GetID(), node)
if err != nil {
log.Warn("failed to assign node to replicas",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64("nodeId", node),
zap.Error(err),
)
return
}
log.Info("assign node to replica",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64("nodeID", node),
)
}
// SpawnReplicas spawns replicas for given collection, assign nodes to them, and save them
func SpawnAllReplicasInRG(m *meta.Meta, collection int64, replicaNumber int32, rgName string) ([]*meta.Replica, error) {
replicas, err := m.ReplicaManager.Spawn(collection, replicaNumber, rgName)