mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
Fix queryCoord panic during query node down (#21400)
Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
d0d0db8c64
commit
6a29a964df
@ -19,11 +19,13 @@ package balance
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type RowCountBasedBalancer struct {
|
||||
@ -117,17 +119,21 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment
|
||||
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil
|
||||
})
|
||||
|
||||
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
|
||||
log.Info("not existed node", zap.Int64("nid", nid), zap.Any("segments", segments), zap.Error(err))
|
||||
continue
|
||||
} else if isStopping {
|
||||
stoppingNodesSegments[nid] = segments
|
||||
} else {
|
||||
nodesSegments[nid] = segments
|
||||
}
|
||||
|
||||
cnt := 0
|
||||
for _, s := range segments {
|
||||
cnt += int(s.GetNumOfRows())
|
||||
}
|
||||
nodesRowCnt[nid] = cnt
|
||||
|
||||
if nodeInfo := b.nodeManager.Get(nid); nodeInfo.IsStoppingState() {
|
||||
stoppingNodesSegments[nid] = segments
|
||||
} else {
|
||||
nodesSegments[nid] = segments
|
||||
}
|
||||
totalCnt += cnt
|
||||
}
|
||||
|
||||
|
@ -134,6 +134,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
||||
cases := []struct {
|
||||
name string
|
||||
nodes []int64
|
||||
notExistedNodes []int64
|
||||
segmentCnts []int
|
||||
states []session.State
|
||||
shouldMock bool
|
||||
@ -212,16 +213,18 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "already balanced",
|
||||
nodes: []int64{1, 2},
|
||||
segmentCnts: []int{1, 2},
|
||||
states: []session.State{session.NodeStateNormal, session.NodeStateNormal},
|
||||
name: "already balanced",
|
||||
nodes: []int64{1, 2},
|
||||
notExistedNodes: []int64{10},
|
||||
segmentCnts: []int{1, 2},
|
||||
states: []session.State{session.NodeStateNormal, session.NodeStateNormal},
|
||||
distributions: map[int64][]*meta.Segment{
|
||||
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 30}, Node: 1}},
|
||||
2: {
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2},
|
||||
},
|
||||
10: {{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 10}},
|
||||
},
|
||||
expectPlans: []SegmentAssignPlan{},
|
||||
expectChannelPlans: []ChannelAssignPlan{},
|
||||
@ -259,7 +262,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
||||
collection.LoadPercentage = 100
|
||||
collection.Status = querypb.LoadStatus_Loaded
|
||||
balancer.meta.CollectionManager.PutCollection(collection)
|
||||
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, c.nodes))
|
||||
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, append(c.nodes, c.notExistedNodes...)))
|
||||
for node, s := range c.distributions {
|
||||
balancer.dist.SegmentDistManager.Update(node, s...)
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -59,6 +60,17 @@ func (m *NodeManager) Stopping(nodeID int64) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *NodeManager) IsStoppingNode(nodeID int64) (bool, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
node := m.nodes[nodeID]
|
||||
if node == nil {
|
||||
return false, fmt.Errorf("nodeID[%d] isn't existed", nodeID)
|
||||
}
|
||||
return node.IsStoppingState(), nil
|
||||
}
|
||||
|
||||
func (m *NodeManager) Get(nodeID int64) *NodeInfo {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
@ -328,7 +328,6 @@ func (node *QueryNode) Stop() error {
|
||||
if err != nil {
|
||||
log.Warn("session fail to go stopping state", zap.Error(err))
|
||||
} else {
|
||||
node.UpdateStateCode(commonpb.StateCode_Stopping)
|
||||
noSegmentChan := node.metaReplica.getNoSegmentChan()
|
||||
select {
|
||||
case <-noSegmentChan:
|
||||
|
@ -15,7 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
function get_milvus_process() {
|
||||
return $(ps -e | grep milvus | grep -v grep | awk '{print $1}')
|
||||
echo $(ps -e | grep milvus | grep -v grep | awk '{print $1}')
|
||||
}
|
||||
|
||||
echo "Stopping milvus..."
|
||||
|
Loading…
Reference in New Issue
Block a user