From 43a9e175a30045b50cb8f71ff2be6dd3fa97490e Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 12 Apr 2023 20:12:28 +0800 Subject: [PATCH] Exit component process when session key is deleted (#21658) (#22164) Signed-off-by: cai.zhang --- cmd/tools/migration/migration/runner.go | 2 +- internal/datacoord/server.go | 11 +- internal/datacoord/server_test.go | 7 +- internal/datanode/data_node.go | 6 +- internal/indexnode/indexnode.go | 6 +- internal/proxy/proxy.go | 6 +- internal/querycoordv2/server.go | 8 +- internal/querynodev2/server.go | 2 +- internal/rootcoord/root_coord.go | 4 +- internal/util/sessionutil/session_util.go | 122 ++++++++++++++---- .../util/sessionutil/session_util_test.go | 20 ++- 11 files changed, 142 insertions(+), 52 deletions(-) diff --git a/cmd/tools/migration/migration/runner.go b/cmd/tools/migration/migration/runner.go index c55a62017f..204c0c2d76 100644 --- a/cmd/tools/migration/migration/runner.go +++ b/cmd/tools/migration/migration/runner.go @@ -166,7 +166,7 @@ func (r *Runner) CheckSessions() error { func (r *Runner) RegisterSession() error { r.session.Register() - go r.session.LivenessCheck(r.ctx, func() {}) + r.session.LivenessCheck(r.ctx, func() {}) return nil } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index f1d0d3355a..dd1c0c4fc6 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -249,8 +249,9 @@ func (s *Server) Register() error { return err } } - go s.session.LivenessCheck(s.serverLoopCtx, func() { - logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", paramtable.GetNodeID())) + + s.session.LivenessCheck(s.serverLoopCtx, func() { + logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID)) if err := s.Stop(); err != nil { logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err)) } @@ -928,13 +929,17 @@ func (s *Server) Stop() error { s.cluster.Close() s.garbageCollector.close() s.stopServerLoop() - s.session.Revoke(time.Second) if Params.DataCoordCfg.EnableCompaction.GetAsBool() { s.stopCompactionTrigger() s.stopCompactionHandler() } s.indexBuilder.Stop() + + if s.session != nil { + s.session.Stop() + } + return nil } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index cbf8c31d4c..1c9d328254 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3947,7 +3947,6 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { var err error paramtable.Get().Save(Params.CommonCfg.DataCoordTimeTick.Key, Params.CommonCfg.DataCoordTimeTick.GetValue()+strconv.Itoa(rand.Int())) factory := dependency.NewDefaultFactory(true) - etcdCli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), Params.EtcdCfg.EtcdUseSSL.GetAsBool(), @@ -4077,10 +4076,6 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server { assert.Nil(t, err) err = svr.Start() assert.Nil(t, err) - - _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) - assert.Nil(t, err) - err = svr.Register() assert.Nil(t, err) @@ -4196,6 +4191,7 @@ func Test_CheckHealth(t *testing.T) { func Test_newChunkManagerFactory(t *testing.T) { server := newTestServer2(t, nil) paramtable.Get().Save(Params.DataCoordCfg.EnableGarbageCollection.Key, "true") + defer closeTestServer(t, server) t.Run("err_minio_bad_address", func(t *testing.T) { paramtable.Get().Save(Params.CommonCfg.StorageType.Key, "minio") @@ -4221,6 +4217,7 @@ func Test_initGarbageCollection(t *testing.T) { defer paramtable.Get().Reset(Params.DataCoordCfg.EnableGarbageCollection.Key) server := newTestServer2(t, nil) + defer closeTestServer(t, server) t.Run("ok", func(t *testing.T) { storageCli, err := server.newChunkManagerFactory() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 6220858c7f..ed18d05be5 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -193,7 +193,7 @@ func (node *DataNode) Register() error { node.session.Register() // Start liveness check - go node.session.LivenessCheck(node.ctx, func() { + node.session.LivenessCheck(node.ctx, func() { log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.GetSession().ServerID)) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) @@ -580,7 +580,9 @@ func (node *DataNode) Stop() error { } } - node.session.Revoke(time.Second) + if node.session != nil { + node.session.Stop() + } return nil } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index f0cbe48426..440a2f5a8b 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -122,7 +122,7 @@ func (i *IndexNode) Register() error { i.session.Register() //start liveness check - go i.session.LivenessCheck(i.loopCtx, func() { + i.session.LivenessCheck(i.loopCtx, func() { log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) if err := i.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) @@ -240,7 +240,9 @@ func (i *IndexNode) Stop() error { if i.sched != nil { i.sched.Close() } - i.session.Revoke(time.Second) + if i.session != nil { + i.session.Stop() + } log.Info("Index node stopped.") }) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index f79fde5634..f20b3dd868 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -132,7 +132,7 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) { // Register registers proxy at etcd func (node *Proxy) Register() error { node.session.Register() - go node.session.LivenessCheck(node.ctx, func() { + node.session.LivenessCheck(node.ctx, func() { log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) @@ -420,7 +420,9 @@ func (node *Proxy) Stop() error { cb() } - node.session.Revoke(time.Second) + if node.session != nil { + node.session.Stop() + } if node.shardMgr != nil { node.shardMgr.Close() diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index e903b7283e..2cd46129ab 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -22,7 +22,6 @@ import ( "os" "sync" "syscall" - "time" "github.com/cockroachdb/errors" clientv3 "go.etcd.io/etcd/client/v3" @@ -134,8 +133,9 @@ func (s *Server) Register() error { return err } } - go s.session.LivenessCheck(s.ctx, func() { - log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", paramtable.GetNodeID())) + + s.session.LivenessCheck(s.ctx, func() { + log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID)) if err := s.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } @@ -425,7 +425,7 @@ func (s *Server) startServerLoop() { func (s *Server) Stop() error { s.cancel() if s.session != nil { - s.session.Revoke(time.Second) + s.session.Stop() } if s.session != nil { diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 112cd1a81d..374037e27a 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -156,7 +156,7 @@ func (node *QueryNode) initSession() error { func (node *QueryNode) Register() error { node.session.Register() // start liveness check - go node.session.LivenessCheck(node.ctx, func() { + node.session.LivenessCheck(node.ctx, func() { log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", paramtable.GetNodeID())) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 68666dcff0..6b7cfa5c59 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -278,7 +278,7 @@ func (c *Core) Register() error { } } log.Info("RootCoord Register Finished") - go c.session.LivenessCheck(c.ctx, func() { + c.session.LivenessCheck(c.ctx, func() { log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) if err := c.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) @@ -674,7 +674,7 @@ func (c *Core) cancelIfNotNil() { func (c *Core) revokeSession() { if c.session != nil { // wait at most one second to revoke - c.session.Revoke(time.Second) + c.session.Stop() log.Info("revoke rootcoord session") } } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index dcc4c64ee5..87b7b6ef4a 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -95,9 +95,11 @@ type Session struct { TriggerKill bool Version semver.Version `json:"Version,omitempty"` - liveCh <-chan bool - etcdCli *clientv3.Client - leaseID *clientv3.LeaseID + liveCh <-chan bool + etcdCli *clientv3.Client + leaseID *clientv3.LeaseID + watchSessionKeyCh clientv3.WatchChan + wg sync.WaitGroup metaRoot string @@ -342,6 +344,22 @@ func (s *Session) getCompleteKey() string { return path.Join(s.metaRoot, DefaultServiceRoot, key) } +func (s *Session) getSessionKey() string { + key := s.ServerName + if !s.Exclusive { + key = fmt.Sprintf("%s-%d", key, s.ServerID) + } + return path.Join(s.metaRoot, DefaultServiceRoot, key) +} + +func (s *Session) initWatchSessionCh() { + getResp, err := s.etcdCli.Get(context.Background(), s.getSessionKey()) + if err != nil { + panic(err) + } + s.watchSessionKeyCh = s.etcdCli.Watch(context.Background(), s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision)) +} + // registerService registers the service to etcd so that other services // can find that the service is online and issue subsequent operations // RegisterService will save a key-value in etcd @@ -396,10 +414,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er log.Debug("put session key into etcd", zap.String("key", completeKey), zap.String("value", string(sessionJSON))) keepAliveCtx, keepAliveCancel := context.WithCancel(context.Background()) - s.keepAliveCancel = func() { - s.Revoke(time.Second) - keepAliveCancel() - } + s.keepAliveCancel = keepAliveCancel ch, err = s.etcdCli.KeepAlive(keepAliveCtx, resp.ID) if err != nil { log.Warn("go error during keeping alive with etcd", zap.Error(err)) @@ -419,7 +434,9 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er // If keepAlive fails for unexpected error, it will send a signal to the channel. func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) (failChannel <-chan bool) { failCh := make(chan bool) + s.wg.Add(1) go func() { + defer s.wg.Done() for { select { case <-s.ctx.Done(): @@ -689,28 +706,81 @@ func (w *sessionWatcher) handleWatchErr(err error) error { // ch is the liveness signal channel, ch is closed only when the session is expired // callback is the function to call when ch is closed, note that callback will not be invoked when loop exits due to context func (s *Session) LivenessCheck(ctx context.Context, callback func()) { - for { - select { - case _, ok := <-s.liveCh: - // ok, still alive - if ok { - continue + s.initWatchSessionCh() + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case _, ok := <-s.liveCh: + // ok, still alive + if ok { + continue + } + // not ok, connection lost + log.Warn("connection lost detected, shuting down") + if callback != nil { + go callback() + } + return + case <-ctx.Done(): + log.Debug("liveness exits due to context done") + // cancel the etcd keepAlive context + if s.keepAliveCancel != nil { + s.keepAliveCancel() + } + return + case resp, ok := <-s.watchSessionKeyCh: + if !ok { + log.Warn("watch session key channel closed") + if s.keepAliveCancel != nil { + s.keepAliveCancel() + } + return + } + if resp.Err() != nil { + // if not ErrCompacted, just close the channel + if resp.Err() != v3rpc.ErrCompacted { + //close event channel + log.Warn("Watch service found error", zap.Error(resp.Err())) + if s.keepAliveCancel != nil { + s.keepAliveCancel() + } + return + } + log.Warn("Watch service found compacted error", zap.Error(resp.Err())) + getResp, err := s.etcdCli.Get(s.ctx, s.getSessionKey()) + if err != nil || len(getResp.Kvs) == 0 { + if s.keepAliveCancel != nil { + s.keepAliveCancel() + } + return + } + s.watchSessionKeyCh = s.etcdCli.Watch(s.ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision)) + continue + } + for _, event := range resp.Events { + switch event.Type { + case mvccpb.PUT: + log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key))) + case mvccpb.DELETE: + log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key))) + if s.keepAliveCancel != nil { + s.keepAliveCancel() + } + } + } } - // not ok, connection lost - log.Warn("connection lost detected, shuting down") - if callback != nil { - go callback() - } - return - case <-ctx.Done(): - log.Debug("liveness exits due to context done") - // cancel the etcd keepAlive context - if s.keepAliveCancel != nil { - s.keepAliveCancel() - } - return } + }() +} + +func (s *Session) Stop() { + s.Revoke(time.Second) + if s.keepAliveCancel != nil { + s.keepAliveCancel() } + s.wg.Wait() } // Revoke revokes the internal leaseID for the session key diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index fda75d5266..fdde8f3bf7 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -183,7 +183,19 @@ func TestUpdateSessions(t *testing.T) { } func TestSessionLivenessCheck(t *testing.T) { - s := &Session{} + paramtable.Init() + params := paramtable.Get() + + endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) + + etcdEndpoints := strings.Split(endpoints, ",") + etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) + require.NoError(t, err) + s := &Session{ + etcdCli: etcdCli, + metaRoot: metaRoot, + } ctx := context.Background() ch := make(chan bool) s.liveCh = ch @@ -191,7 +203,7 @@ func TestSessionLivenessCheck(t *testing.T) { flag := false - go s.LivenessCheck(ctx, func() { + s.LivenessCheck(ctx, func() { flag = true signal <- struct{}{} }) @@ -211,7 +223,7 @@ func TestSessionLivenessCheck(t *testing.T) { s.liveCh = ch flag = false - go s.LivenessCheck(ctx, func() { + s.LivenessCheck(ctx, func() { flag = true signal <- struct{}{} }) @@ -648,7 +660,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) { wg.Done() return nil }) - go s1.LivenessCheck(ctx1, func() { + s1.LivenessCheck(ctx1, func() { flag = true signal <- struct{}{} s1.keepAliveCancel()