mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Fix querynode panic during stop (#20242)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com> Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
parent
51acba7d99
commit
b7ee057d6b
@ -353,6 +353,10 @@ func (node *QueryNode) Stop() error {
|
||||
node.metaReplica.freeAll()
|
||||
}
|
||||
|
||||
if node.ShardClusterService != nil {
|
||||
node.ShardClusterService.close()
|
||||
}
|
||||
|
||||
if node.queryShardService != nil {
|
||||
node.queryShardService.close()
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package querynode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
@ -101,6 +102,29 @@ func (s *ShardClusterService) releaseShardCluster(vchannelName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ShardClusterService) close() error {
|
||||
log.Debug("start to close shard cluster service")
|
||||
|
||||
isFinish := true
|
||||
s.clusters.Range(func(key, value any) bool {
|
||||
cs, ok := value.(*ShardCluster)
|
||||
if !ok {
|
||||
log.Error("convert to ShardCluster fail, close shard cluster is interrupted", zap.Any("key", key))
|
||||
isFinish = false
|
||||
return false
|
||||
}
|
||||
|
||||
cs.Close()
|
||||
return true
|
||||
})
|
||||
|
||||
if isFinish {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New("close shard cluster failed")
|
||||
}
|
||||
|
||||
// releaseCollection removes all shardCluster matching specified collectionID
|
||||
func (s *ShardClusterService) releaseCollection(collectionID int64) {
|
||||
s.clusters.Range(func(k, v interface{}) bool {
|
||||
|
@ -99,3 +99,30 @@ func TestShardClusterService_SyncReplicaSegments(t *testing.T) {
|
||||
assert.Equal(t, segmentStateLoaded, segment.state)
|
||||
})
|
||||
}
|
||||
|
||||
func TestShardClusterService_close(t *testing.T) {
|
||||
client := v3client.New(embedetcdServer.Server)
|
||||
defer client.Close()
|
||||
session := sessionutil.NewSession(context.Background(), "/by-dev/sessions/unittest/querynode/", client)
|
||||
clusterService := newShardClusterService(client, session, nil)
|
||||
|
||||
t.Run("close ok", func(t *testing.T) {
|
||||
clusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel)
|
||||
|
||||
cnt := 0
|
||||
clusterService.clusters.Range(func(key, value any) bool {
|
||||
cnt++
|
||||
return true
|
||||
})
|
||||
assert.Equal(t, 1, cnt)
|
||||
|
||||
err := clusterService.close()
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("close fail", func(t *testing.T) {
|
||||
clusterService.clusters.Store("key", "error")
|
||||
err := clusterService.close()
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user