Filter out the unavailable shard leaders (#16677)

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2022-04-27 16:27:46 +08:00 committed by GitHub
parent 0bf176a7e1
commit 68ef720d6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 24 deletions

View File

@ -1055,20 +1055,7 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas
}
if req.WithShardNodes {
shardNodes := make(map[string]map[UniqueID]struct{})
segments := qc.meta.showSegmentInfos(req.CollectionID, nil)
for _, segment := range segments {
nodes, ok := shardNodes[segment.DmChannel]
if !ok {
nodes = make(map[UniqueID]struct{})
}
for _, nodeID := range segment.NodeIds {
nodes[nodeID] = struct{}{}
}
shardNodes[segment.DmChannel] = nodes
}
shardNodes := getShardNodes(req.CollectionID, qc.meta)
for _, replica := range replicas {
for _, shard := range replica.ShardReplicas {
@ -1131,6 +1118,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard
}
shards := make(map[string]*querypb.ShardLeadersList)
shardNodes := getShardNodes(req.CollectionID, qc.meta)
for _, replica := range replicas {
for _, shard := range replica.ShardReplicas {
list, ok := shards[shard.DmChannelName]
@ -1142,9 +1130,32 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard
}
}
list.NodeIds = append(list.NodeIds, shard.LeaderID)
list.NodeAddrs = append(list.NodeAddrs, shard.LeaderAddr)
shards[shard.DmChannelName] = list
isShardAvailable, err := qc.cluster.isOnline(shard.LeaderID)
if err != nil || !isShardAvailable {
log.Warn("shard leader is unavailable",
zap.Int64("collectionID", replica.CollectionID),
zap.Int64("replicaID", replica.ReplicaID),
zap.String("DmChannel", shard.DmChannelName),
zap.Int64("shardLeaderID", shard.LeaderID),
zap.Error(err))
continue
}
nodes := shardNodes[shard.DmChannelName]
for _, nodeID := range replica.NodeIds {
if _, ok := nodes[nodeID]; ok {
if ok, err := qc.cluster.isOnline(nodeID); err != nil || !ok {
isShardAvailable = false
break
}
}
}
if isShardAvailable {
list.NodeIds = append(list.NodeIds, shard.LeaderID)
list.NodeAddrs = append(list.NodeAddrs, shard.LeaderAddr)
shards[shard.DmChannelName] = list
}
}
}

View File

@ -1574,6 +1574,7 @@ func TestGetShardLeaders(t *testing.T) {
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
defer queryCoord.Stop()
node1, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
@ -1584,6 +1585,9 @@ func TestGetShardLeaders(t *testing.T) {
waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID)
waitQueryNodeOnline(queryCoord.cluster, node2.queryNodeID)
waitQueryNodeOnline(queryCoord.cluster, node3.queryNodeID)
defer node2.stop()
defer node3.stop()
defer removeAllSession()
// First, load collection with replicas
loadCollectionReq := &querypb.LoadCollectionRequest{
@ -1610,9 +1614,28 @@ func TestGetShardLeaders(t *testing.T) {
totalLeaders := 0
for i := 0; i < len(resp.Shards); i++ {
totalLeaders += len(resp.Shards[i].NodeIds)
assert.Equal(t, 3, len(resp.Shards[i].NodeIds))
}
assert.Equal(t, 0, totalLeaders%3)
// Filter out unavailable shard
err = node1.stop()
assert.NoError(t, err)
err = removeNodeSession(node1.queryNodeID)
assert.NoError(t, err)
waitAllQueryNodeOffline(queryCoord.cluster, []int64{node1.queryNodeID})
resp, err = queryCoord.GetShardLeaders(ctx, getShardLeadersReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
for i := 0; i < len(resp.Shards); i++ {
assert.Equal(t, 2, len(resp.Shards[i].NodeIds))
}
node4, err := startQueryNodeServer(ctx)
assert.NoError(t, err)
waitQueryNodeOnline(queryCoord.cluster, node4.queryNodeID)
defer node4.stop()
// GetShardLeaders after release collection, it should return meta failed
status, err = queryCoord.ReleaseCollection(ctx, &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{},
@ -1624,9 +1647,4 @@ func TestGetShardLeaders(t *testing.T) {
resp, err = queryCoord.GetShardLeaders(ctx, getShardLeadersReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.Status.ErrorCode)
node1.stop()
node2.stop()
node3.stop()
queryCoord.Stop()
}

View File

@ -1343,3 +1343,22 @@ func removeCollectionMeta(collectionID UniqueID, replicas []UniqueID, kv kv.Meta
return kv.MultiRemoveWithPrefix(prefixes)
}
func getShardNodes(collectionID UniqueID, meta Meta) map[string]map[UniqueID]struct{} {
shardNodes := make(map[string]map[UniqueID]struct{})
segments := meta.showSegmentInfos(collectionID, nil)
for _, segment := range segments {
nodes, ok := shardNodes[segment.DmChannel]
if !ok {
nodes = make(map[UniqueID]struct{})
}
for _, nodeID := range segment.NodeIds {
nodes[nodeID] = struct{}{}
}
shardNodes[segment.DmChannel] = nodes
}
return shardNodes
}

View File

@ -69,8 +69,8 @@ func waitAllQueryNodeOffline(cluster Cluster, nodeIDs []int64) bool {
for {
allOffline := true
for _, nodeID := range nodeIDs {
nodeExist := cluster.hasNode(nodeID)
if nodeExist {
isOnline, err := cluster.isOnline(nodeID)
if err == nil && isOnline {
allOffline = false
break
}