Make SessionWatch keep watch even no Rewatch func when find ErrCompacted (#15497)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-02-15 15:07:48 +08:00 committed by GitHub
parent 684110bc9a
commit c7f55c2e34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 218 additions and 36 deletions

View File

@ -406,6 +406,7 @@ func (s *Server) initServiceDiscovery() error {
s.cluster.Startup(datanodes) s.cluster.Startup(datanodes)
// TODO implement rewatch logic
s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil) s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
return nil return nil
} }
@ -607,7 +608,13 @@ func (s *Server) watchService(ctx context.Context) {
return return
case event, ok := <-s.eventCh: case event, ok := <-s.eventCh:
if !ok { if !ok {
//TODO add retry logic // ErrCompacted in handled inside SessionWatcher
// So there is some other error occurred, closing DataCoord server
logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", s.session.ServerID))
go s.Stop()
if s.session.TriggerKill {
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}
return return
} }
if err := s.handleSessionEvent(ctx, event); err != nil { if err := s.handleSessionEvent(ctx, event); err != nil {
@ -620,7 +627,6 @@ func (s *Server) watchService(ctx context.Context) {
} }
} }
} }
} }
// handles session events - DataNodes Add/Del // handles session events - DataNodes Add/Del

View File

@ -22,9 +22,11 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
"os/signal"
"path" "path"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"syscall"
"testing" "testing"
"time" "time"
@ -611,25 +613,40 @@ func TestGetFlushedSegments(t *testing.T) {
} }
func TestService_WatchServices(t *testing.T) { func TestService_WatchServices(t *testing.T) {
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT)
defer signal.Reset(syscall.SIGINT)
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
svr := CreateServer(context.TODO(), factory) svr := CreateServer(context.TODO(), factory)
svr.session = &sessionutil.Session{
TriggerKill: true,
}
svr.serverLoopWg.Add(1) svr.serverLoopWg.Add(1)
ech := make(chan *sessionutil.SessionEvent) ech := make(chan *sessionutil.SessionEvent)
svr.eventCh = ech svr.eventCh = ech
flag := false flag := false
signal := make(chan struct{}, 1) closed := false
sigDone := make(chan struct{}, 1)
sigQuit := make(chan struct{}, 1)
go func() { go func() {
svr.watchService(context.Background()) svr.watchService(context.Background())
flag = true flag = true
signal <- struct{}{} sigDone <- struct{}{}
}()
go func() {
<-sc
closed = true
sigQuit <- struct{}{}
}() }()
close(ech) close(ech)
<-signal <-sigDone
<-sigQuit
assert.True(t, flag) assert.True(t, flag)
assert.True(t, closed)
ech = make(chan *sessionutil.SessionEvent) ech = make(chan *sessionutil.SessionEvent)
@ -641,12 +658,12 @@ func TestService_WatchServices(t *testing.T) {
go func() { go func() {
svr.watchService(ctx) svr.watchService(ctx)
flag = true flag = true
signal <- struct{}{} sigDone <- struct{}{}
}() }()
ech <- nil ech <- nil
cancel() cancel()
<-signal <-sigDone
assert.True(t, flag) assert.True(t, flag)
} }

View File

@ -200,6 +200,7 @@ func (i *IndexCoord) Init() error {
} }
log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.nodeClients))) log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.nodeClients)))
// TODO silverxia add Rewatch logic
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil) i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil)
nodeTasks := i.metaTable.GetNodeTaskStats() nodeTasks := i.metaTable.GetNodeTaskStats()
for nodeID, taskNum := range nodeTasks { for nodeID, taskNum := range nodeTasks {
@ -758,7 +759,12 @@ func (i *IndexCoord) watchNodeLoop() {
return return
case event, ok := <-i.eventChan: case event, ok := <-i.eventChan:
if !ok { if !ok {
//TODO silverxia add retry // ErrCompacted is handled inside SessionWatcher
log.Error("Session Watcher channel closed", zap.Int64("server id", i.session.ServerID))
go i.Stop()
if i.session.TriggerKill {
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}
return return
} }
log.Debug("IndexCoord watchNodeLoop event updated") log.Debug("IndexCoord watchNodeLoop event updated")

View File

@ -19,7 +19,10 @@ package indexcoord
import ( import (
"context" "context"
"math/rand" "math/rand"
"os"
"os/signal"
"sync" "sync"
"syscall"
"testing" "testing"
"time" "time"
@ -227,21 +230,37 @@ func TestIndexCoord_watchNodeLoop(t *testing.T) {
loopWg: sync.WaitGroup{}, loopWg: sync.WaitGroup{},
loopCtx: context.Background(), loopCtx: context.Background(),
eventChan: ech, eventChan: ech,
session: &sessionutil.Session{
TriggerKill: true,
ServerID: 0,
},
} }
in.loopWg.Add(1) in.loopWg.Add(1)
flag := false flag := false
signal := make(chan struct{}, 1) closed := false
sigDone := make(chan struct{}, 1)
sigQuit := make(chan struct{}, 1)
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT)
defer signal.Reset(syscall.SIGINT)
go func() { go func() {
in.watchNodeLoop() in.watchNodeLoop()
flag = true flag = true
signal <- struct{}{} sigDone <- struct{}{}
}()
go func() {
<-sc
closed = true
sigQuit <- struct{}{}
}() }()
close(ech) close(ech)
<-signal <-sigDone
<-sigQuit
assert.True(t, flag) assert.True(t, flag)
assert.True(t, closed)
} }
func TestIndexCoord_GetComponentStates(t *testing.T) { func TestIndexCoord_GetComponentStates(t *testing.T) {

View File

@ -367,13 +367,24 @@ func (qc *QueryCoord) watchNodeLoop() {
log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask)) log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
} }
// TODO silverxia add Rewatch logic
qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1, nil) qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1, nil)
qc.handleNodeEvent(ctx)
}
func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case event, ok := <-qc.eventChan: case event, ok := <-qc.eventChan:
if !ok { if !ok {
// ErrCompacted is handled inside SessionWatcher
log.Error("Session Watcher channel closed", zap.Int64("server id", qc.session.ServerID))
go qc.Stop()
if qc.session.TriggerKill {
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}
return return
} }
switch event.EventType { switch event.EventType {

View File

@ -22,7 +22,9 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
"os/signal"
"strconv" "strconv"
"syscall"
"testing" "testing"
"time" "time"
@ -243,6 +245,44 @@ func TestWatchNodeLoop(t *testing.T) {
}) })
} }
func TestHandleNodeEventClosed(t *testing.T) {
ech := make(chan *sessionutil.SessionEvent)
qc := &QueryCoord{
eventChan: ech,
session: &sessionutil.Session{
TriggerKill: true,
ServerID: 0,
},
}
flag := false
closed := false
sigDone := make(chan struct{}, 1)
sigQuit := make(chan struct{}, 1)
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT)
defer signal.Reset(syscall.SIGINT)
go func() {
qc.handleNodeEvent(context.Background())
flag = true
sigDone <- struct{}{}
}()
go func() {
<-sc
closed = true
sigQuit <- struct{}{}
}()
close(ech)
<-sigDone
<-sigQuit
assert.True(t, flag)
assert.True(t, closed)
}
func TestHandoffSegmentLoop(t *testing.T) { func TestHandoffSegmentLoop(t *testing.T) {
refreshParams() refreshParams()
baseCtx := context.Background() baseCtx := context.Background()

View File

@ -217,7 +217,13 @@ func (node *QueryNode) watchService(ctx context.Context) {
return return
case event, ok := <-node.eventCh: case event, ok := <-node.eventCh:
if !ok { if !ok {
//TODO add retry logic // ErrCompacted is handled inside SessionWatcher
log.Error("Session Watcher channel closed", zap.Int64("server id", node.session.ServerID))
// need to call stop in separate goroutine
go node.Stop()
if node.session.TriggerKill {
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}
return return
} }
if err := node.handleSessionEvent(ctx, event); err != nil { if err := node.handleSessionEvent(ctx, event); err != nil {

View File

@ -20,8 +20,10 @@ import (
"context" "context"
"math/rand" "math/rand"
"os" "os"
"os/signal"
"strconv" "strconv"
"sync" "sync"
"syscall"
"testing" "testing"
"time" "time"
@ -36,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil"
) )
// mock of query coordinator client // mock of query coordinator client
@ -425,3 +428,80 @@ func TestQueryNode_watchChangeInfo(t *testing.T) {
}) })
wg.Wait() wg.Wait()
} }
func TestQueryNode_watchService(t *testing.T) {
t.Run("watch channel closed", func(t *testing.T) {
ech := make(chan *sessionutil.SessionEvent)
qn := &QueryNode{
session: &sessionutil.Session{
TriggerKill: true,
ServerID: 0,
},
wg: sync.WaitGroup{},
eventCh: ech,
queryNodeLoopCancel: func() {},
}
flag := false
closed := false
sigDone := make(chan struct{}, 1)
sigQuit := make(chan struct{}, 1)
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT)
defer signal.Reset(syscall.SIGINT)
qn.wg.Add(1)
go func() {
qn.watchService(context.Background())
flag = true
sigDone <- struct{}{}
}()
go func() {
<-sc
closed = true
sigQuit <- struct{}{}
}()
close(ech)
<-sigDone
<-sigQuit
assert.True(t, flag)
assert.True(t, closed)
})
t.Run("context done", func(t *testing.T) {
ech := make(chan *sessionutil.SessionEvent)
qn := &QueryNode{
session: &sessionutil.Session{
TriggerKill: true,
ServerID: 0,
},
wg: sync.WaitGroup{},
eventCh: ech,
}
flag := false
sigDone := make(chan struct{}, 1)
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGINT)
defer signal.Reset(syscall.SIGINT)
qn.wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
qn.watchService(ctx)
flag = true
sigDone <- struct{}{}
}()
assert.False(t, flag)
cancel()
<-sigDone
assert.True(t, flag)
})
}

View File

@ -403,21 +403,18 @@ func (w *sessionWatcher) handleWatchErr(err error) error {
return err return err
} }
// rewatch is nil, no logic to handle
if w.rewatch == nil {
log.Warn("Watch service with ErrCompacted but no rewatch logic provided")
close(w.eventCh)
return err
}
sessions, revision, err := w.s.GetSessions(w.prefix) sessions, revision, err := w.s.GetSessions(w.prefix)
if err != nil { if err != nil {
log.Warn("GetSession before rewatch failed", zap.String("prefix", w.prefix), zap.Error(err)) log.Warn("GetSession before rewatch failed", zap.String("prefix", w.prefix), zap.Error(err))
close(w.eventCh) close(w.eventCh)
return err return err
} }
// rewatch is nil, no logic to handle
err = w.rewatch(sessions) if w.rewatch == nil {
log.Warn("Watch service with ErrCompacted but no rewatch logic provided")
} else {
err = w.rewatch(sessions)
}
if err != nil { if err != nil {
log.Warn("WatchServices rewatch failed", zap.String("prefix", w.prefix), zap.Error(err)) log.Warn("WatchServices rewatch failed", zap.String("prefix", w.prefix), zap.Error(err))
close(w.eventCh) close(w.eventCh)

View File

@ -18,7 +18,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )
@ -303,8 +302,7 @@ func TestWatcherHandleWatchResp(t *testing.T) {
CompactRevision: 1, CompactRevision: 1,
} }
err := w.handleWatchResponse(wresp) err := w.handleWatchResponse(wresp)
assert.Error(t, err) assert.NoError(t, err)
assert.Equal(t, v3rpc.ErrCompacted, err)
}) })
t.Run("err compacted resp, valid Rewatch", func(t *testing.T) { t.Run("err compacted resp, valid Rewatch", func(t *testing.T) {
@ -327,6 +325,19 @@ func TestWatcherHandleWatchResp(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("err handled but rewatch failed", func(t *testing.T) {
w := getWatcher(s, func(sessions map[string]*Session) error {
return errors.New("mocked")
})
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err := w.handleWatchResponse(wresp)
t.Log(err.Error())
assert.Error(t, err)
})
t.Run("err handled but list failed", func(t *testing.T) { t.Run("err handled but list failed", func(t *testing.T) {
s := NewSession(ctx, "/by-dev/session-ut", etcdCli) s := NewSession(ctx, "/by-dev/session-ut", etcdCli)
s.etcdCli.Close() s.etcdCli.Close()
@ -341,17 +352,6 @@ func TestWatcherHandleWatchResp(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("err handled but rewatch failed", func(t *testing.T) {
w := getWatcher(s, func(sessions map[string]*Session) error {
return errors.New("mocked")
})
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err := w.handleWatchResponse(wresp)
assert.Error(t, err)
})
} }
func TestSessionRevoke(t *testing.T) { func TestSessionRevoke(t *testing.T) {