Retry keepalive when keepalive channel close (#24581)

Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
wayblink 2023-06-01 16:14:35 +08:00 committed by GitHub
parent 31880ab427
commit 5fb5b072ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 14 deletions

View File

@ -95,7 +95,8 @@ type Session struct {
TriggerKill bool
Version semver.Version `json:"Version,omitempty"`
liveCh <-chan bool
liveChOnce sync.Once
liveCh chan bool
etcdCli *clientv3.Client
leaseID *clientv3.LeaseID
watchSessionKeyCh clientv3.WatchChan
@ -103,8 +104,10 @@ type Session struct {
metaRoot string
registered atomic.Value
disconnected atomic.Value
registered atomic.Value
disconnected atomic.Value
retryKeepAlive atomic.Value
enableRetryKeepAlive bool
isStandby atomic.Value
enableActiveStandBy bool
@ -202,9 +205,10 @@ func NewSession(ctx context.Context, metaRoot string, client *clientv3.Client, o
Version: common.Version,
// options
sessionTTL: paramtable.Get().CommonCfg.SessionTTL.GetAsInt64(),
sessionRetryTimes: paramtable.Get().CommonCfg.SessionRetryTimes.GetAsInt64(),
reuseNodeID: true,
sessionTTL: paramtable.Get().CommonCfg.SessionTTL.GetAsInt64(),
sessionRetryTimes: paramtable.Get().CommonCfg.SessionRetryTimes.GetAsInt64(),
reuseNodeID: true,
enableRetryKeepAlive: true,
}
// integration test create cluster with different nodeId in one process
@ -264,7 +268,8 @@ func (s *Session) Register() {
log.Error("Register failed", zap.Error(err))
panic(err)
}
s.liveCh = s.processKeepAliveResponse(ch)
s.liveCh = make(chan bool)
s.processKeepAliveResponse(ch)
s.UpdateRegistered(true)
}
@ -433,8 +438,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
// processKeepAliveResponse processes the response of etcd keepAlive interface
// If keepAlive fails for unexpected error, it will send a signal to the channel.
func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) (failChannel <-chan bool) {
failCh := make(chan bool)
func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
@ -449,19 +453,39 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
case resp, ok := <-ch:
if !ok {
log.Warn("session keepalive channel closed")
close(failCh)
if !s.enableRetryKeepAlive {
s.safeCloseLiveCh()
return
}
log.Info("start try to KeepAliveOnce", zap.String("serverName", s.ServerName))
s.retryKeepAlive.Store(true)
// have to KeepAliveOnce before KeepAlive because KeepAlive won't throw error even when lease OT
keepAliveOnceResp, err := s.etcdCli.KeepAliveOnce(s.ctx, *s.leaseID)
if err != nil {
// error="etcdserver: requested lease not found"
log.Warn("fail to keepAliveOnce", zap.Error(err))
s.safeCloseLiveCh()
return
}
log.Info("succeed to KeepAliveOnce", zap.Any("resp", keepAliveOnceResp))
chNew, err := s.etcdCli.KeepAlive(s.ctx, *s.leaseID)
if err != nil {
log.Warn("fail to retry keepAlive", zap.Error(err))
s.safeCloseLiveCh()
return
}
s.processKeepAliveResponse(chNew)
s.retryKeepAlive.Store(false)
return
}
if resp == nil {
log.Warn("session keepalive response failed")
close(failCh)
return
s.safeCloseLiveCh()
}
//failCh <- true
}
}
}()
return failCh
}
// GetSessions will get all sessions registered in etcd.
@ -770,6 +794,10 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
case mvccpb.PUT:
log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
case mvccpb.DELETE:
if s.retryKeepAlive.Load().(bool) {
log.Info("session key is deleted during re register, ignore this DELETE event", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
continue
}
log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
if s.keepAliveCancel != nil {
s.keepAliveCancel()
@ -841,6 +869,16 @@ func (s *Session) updateStandby(b bool) {
s.isStandby.Store(b)
}
func (s *Session) SetEnableRetryKeepAlive(enable bool) {
s.enableRetryKeepAlive = enable
}
func (s *Session) safeCloseLiveCh() {
s.liveChOnce.Do(func() {
close(s.liveCh)
})
}
// ProcessActiveStandBy is used by coordinators to do active-standby mechanism.
// coordinator enabled active-standby will first call Register and then call ProcessActiveStandBy.
// steps:

View File

@ -617,6 +617,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
s1 := NewSession(ctx1, metaRoot, etcdCli, WithResueNodeID(false))
s1.Init("inittest", "testAddr", true, true)
s1.SetEnableActiveStandBy(true)
s1.SetEnableRetryKeepAlive(false)
s1.Register()
wg.Add(1)
s1.liveCh = ch
@ -637,6 +638,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
s2 := NewSession(ctx2, metaRoot, etcdCli, WithResueNodeID(false))
s2.Init("inittest", "testAddr", true, true)
s2.SetEnableActiveStandBy(true)
s2.SetEnableRetryKeepAlive(false)
s2.Register()
wg.Add(1)
go s2.ProcessActiveStandBy(func() error {
@ -652,7 +654,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
assert.False(t, flag)
ch <- true
assert.False(t, flag)
close(ch)
s1.safeCloseLiveCh()
<-signal
assert.True(t, flag)
@ -864,6 +866,61 @@ func (s *SessionSuite) TestRevoke() {
}
}
func (s *SessionSuite) TestKeepAliveRetryEnable() {
ctx := context.Background()
session := NewSession(ctx, s.metaRoot, s.client)
session.Init("test", "normal", false, false)
// Register
ch, err := session.registerService()
if err != nil {
panic(err)
}
session.SetEnableRetryKeepAlive(true)
session.liveCh = make(chan bool)
session.processKeepAliveResponse(ch)
session.LivenessCheck(ctx, nil)
session.keepAliveCancel()
// sleep a while wait goroutine process
time.Sleep(time.Millisecond * 100)
// expected Disconnected = false, means session is not closed
assert.Equal(s.T(), false, session.Disconnected())
}
func (s *SessionSuite) TestKeepAliveRetryDisable() {
ctx := context.Background()
session := NewSession(ctx, s.metaRoot, s.client)
session.Init("test", "normal", false, false)
// Register
ch, err := session.registerService()
if err != nil {
panic(err)
}
session.SetEnableRetryKeepAlive(false)
session.liveCh = make(chan bool)
session.processKeepAliveResponse(ch)
session.LivenessCheck(ctx, nil)
session.keepAliveCancel()
// sleep a while wait goroutine process
time.Sleep(time.Millisecond * 100)
// expected Disconnected = true, means session is closed
assert.Equal(s.T(), true, session.Disconnected())
}
func (s *SessionSuite) TestSafeCloseLiveCh() {
ctx := context.Background()
session := NewSession(ctx, s.metaRoot, s.client)
session.Init("test", "normal", false, false)
session.liveCh = make(chan bool)
session.safeCloseLiveCh()
assert.NotPanics(s.T(), func() {
session.safeCloseLiveCh()
})
}
func TestSessionSuite(t *testing.T) {
suite.Run(t, new(SessionSuite))
}