From b7ee057d6b02334dbc6207ad7f2a6be6aca1f160 Mon Sep 17 00:00:00 2001 From: jaime Date: Wed, 2 Nov 2022 14:59:34 +0800 Subject: [PATCH] Fix querynode panic during stop (#20242) Signed-off-by: yun.zhang Signed-off-by: yun.zhang --- internal/querynode/query_node.go | 4 +++ internal/querynode/shard_cluster_service.go | 24 +++++++++++++++++ .../querynode/shard_cluster_service_test.go | 27 +++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index f6b61ccb7a..ab1eac67c9 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -353,6 +353,10 @@ func (node *QueryNode) Stop() error { node.metaReplica.freeAll() } + if node.ShardClusterService != nil { + node.ShardClusterService.close() + } + if node.queryShardService != nil { node.queryShardService.close() } diff --git a/internal/querynode/shard_cluster_service.go b/internal/querynode/shard_cluster_service.go index ed744a012e..ca765e69aa 100644 --- a/internal/querynode/shard_cluster_service.go +++ b/internal/querynode/shard_cluster_service.go @@ -2,6 +2,7 @@ package querynode import ( "context" + "errors" "fmt" "path" "strconv" @@ -101,6 +102,29 @@ func (s *ShardClusterService) releaseShardCluster(vchannelName string) error { return nil } +func (s *ShardClusterService) close() error { + log.Debug("start to close shard cluster service") + + isFinish := true + s.clusters.Range(func(key, value any) bool { + cs, ok := value.(*ShardCluster) + if !ok { + log.Error("convert to ShardCluster fail, close shard cluster is interrupted", zap.Any("key", key)) + isFinish = false + return false + } + + cs.Close() + return true + }) + + if isFinish { + return nil + } + + return errors.New("close shard cluster failed") +} + // releaseCollection removes all shardCluster matching specified collectionID func (s *ShardClusterService) releaseCollection(collectionID int64) { s.clusters.Range(func(k, v interface{}) bool { diff --git a/internal/querynode/shard_cluster_service_test.go b/internal/querynode/shard_cluster_service_test.go index 57251e077a..bc8032c51b 100644 --- a/internal/querynode/shard_cluster_service_test.go +++ b/internal/querynode/shard_cluster_service_test.go @@ -99,3 +99,30 @@ func TestShardClusterService_SyncReplicaSegments(t *testing.T) { assert.Equal(t, segmentStateLoaded, segment.state) }) } + +func TestShardClusterService_close(t *testing.T) { + client := v3client.New(embedetcdServer.Server) + defer client.Close() + session := sessionutil.NewSession(context.Background(), "/by-dev/sessions/unittest/querynode/", client) + clusterService := newShardClusterService(client, session, nil) + + t.Run("close ok", func(t *testing.T) { + clusterService.addShardCluster(defaultCollectionID, defaultReplicaID, defaultDMLChannel) + + cnt := 0 + clusterService.clusters.Range(func(key, value any) bool { + cnt++ + return true + }) + assert.Equal(t, 1, cnt) + + err := clusterService.close() + assert.NoError(t, err) + }) + + t.Run("close fail", func(t *testing.T) { + clusterService.clusters.Store("key", "error") + err := clusterService.close() + assert.Error(t, err) + }) +}