fix retry on offline node (#28079)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-11-03 10:14:16 +08:00 committed by GitHub
parent 86ec6f4832
commit ecec5dfcfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 972 additions and 3 deletions

View File

@ -441,6 +441,7 @@ generate-mockery-utils: getdeps
$(INSTALL_PATH)/mockery --name=Factory --dir=internal/util/dependency --output=internal/util/dependency --filename=mock_factory.go --with-expecter --structname=MockFactory --inpackage
# tso.Allocator
$(INSTALL_PATH)/mockery --name=Allocator --dir=internal/tso --output=internal/tso/mocks --filename=allocator.go --with-expecter --structname=Allocator --outpkg=mocktso
$(INSTALL_PATH)/mockery --name=SessionInterface --dir=$(PWD)/internal/util/sessionutil --output=$(PWD)/internal/util/sessionutil --filename=mock_session.go --with-expecter --structname=MockSession --inpackage
generate-mockery-kv: getdeps
$(INSTALL_PATH)/mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=txn_kv.go --with-expecter

View File

@ -262,7 +262,10 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
Timestamps: delRecord.Timestamps,
})
if errors.Is(err, merr.ErrSegmentNotFound) {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("try to delete data on non-exist node")
return retry.Unrecoverable(err)
} else if errors.Is(err, merr.ErrSegmentNotFound) {
log.Warn("try to delete data of released segment")
return nil
} else if err != nil {

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -327,6 +328,18 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
RowCount: 1,
},
}, 10)
// test worker offline
worker1.ExpectedCalls = nil
worker1.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.ErrNodeNotFound)
s.delegator.ProcessDelete([]*DeleteData{
{
PartitionID: 500,
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
Timestamps: []uint64{10},
RowCount: 1,
},
}, 10)
}
func (s *DelegatorDataSuite) TestLoadSegments() {

View File

@ -48,6 +48,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// GrpcClient abstracts client of grpc
@ -105,7 +106,7 @@ type ClientBase[T interface {
maxCancelError int32
NodeID atomic.Int64
sess *sessionutil.Session
sess sessionutil.SessionInterface
}
func NewClientBase[T interface {
@ -321,9 +322,10 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
}
func (c *ClientBase[T]) verifySession(ctx context.Context) error {
if funcutil.CheckCtxValid(ctx) {
if !funcutil.CheckCtxValid(ctx) {
return nil
}
log := log.Ctx(ctx).With(zap.String("clientRole", c.GetRole()))
if time.Since(c.lastSessionCheck.Load()) < c.minSessionCheckInterval {
log.Debug("skip session check, verify too frequent")
@ -410,6 +412,17 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er
defer cancel()
err := retry.Do(ctx, func() error {
if generic.IsZero(client) {
switch c.GetRole() {
case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole:
// if session doesn't exist, no need to reset connection for datanode/indexnode/querynode
err := c.verifySession(ctx)
if err != nil && errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("failed to verify node session", zap.Error(err))
// stop retry
return retry.Unrecoverable(err)
}
}
err := errors.Wrap(clientErr, "empty grpc client")
log.Warn("grpc client is nil, maybe fail to get client in the retry state", zap.Error(err))
resetClientFunc()
@ -428,6 +441,7 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er
log.Warn("start to reset connection because of specific reasons", zap.Error(err))
resetClientFunc()
} else {
// err occurs but no need to reset connection, try to verify session
err := c.verifySession(ctx)
if err != nil {
log.Warn("failed to verify session, reset connection", zap.Error(err))

View File

@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/examples/helloworld/helloworld"
@ -37,6 +38,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -92,6 +94,29 @@ func TestClientBase_connect(t *testing.T) {
})
}
func TestClientBase_NodeSessionNotExist(t *testing.T) {
base := ClientBase[*mockClient]{
maxCancelError: 10,
MaxAttempts: 3,
}
base.SetGetAddrFunc(func() (string, error) {
return "", errors.New("mocked address error")
})
base.role = typeutil.QueryNodeRole
mockSession := sessionutil.NewMockSession(t)
mockSession.EXPECT().GetSessions(mock.Anything).Return(nil, 0, nil)
base.sess = mockSession
base.grpcClientMtx.Lock()
base.grpcClient = nil
base.grpcClientMtx.Unlock()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := base.Call(ctx, func(client *mockClient) (any, error) {
return struct{}{}, nil
})
assert.True(t, errors.Is(err, merr.ErrNodeNotFound))
}
func TestClientBase_Call(t *testing.T) {
testCall(t, false)
}

View File

@ -0,0 +1,864 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package sessionutil
import (
context "context"
semver "github.com/blang/semver/v4"
mock "github.com/stretchr/testify/mock"
time "time"
)
// MockSession is an autogenerated mock type for the SessionInterface type
type MockSession struct {
mock.Mock
}
type MockSession_Expecter struct {
mock *mock.Mock
}
func (_m *MockSession) EXPECT() *MockSession_Expecter {
return &MockSession_Expecter{mock: &_m.Mock}
}
// Disconnected provides a mock function with given fields:
func (_m *MockSession) Disconnected() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockSession_Disconnected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Disconnected'
type MockSession_Disconnected_Call struct {
*mock.Call
}
// Disconnected is a helper method to define mock.On call
func (_e *MockSession_Expecter) Disconnected() *MockSession_Disconnected_Call {
return &MockSession_Disconnected_Call{Call: _e.mock.On("Disconnected")}
}
func (_c *MockSession_Disconnected_Call) Run(run func()) *MockSession_Disconnected_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_Disconnected_Call) Return(_a0 bool) *MockSession_Disconnected_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_Disconnected_Call) RunAndReturn(run func() bool) *MockSession_Disconnected_Call {
_c.Call.Return(run)
return _c
}
// ForceActiveStandby provides a mock function with given fields: activateFunc
func (_m *MockSession) ForceActiveStandby(activateFunc func() error) error {
ret := _m.Called(activateFunc)
var r0 error
if rf, ok := ret.Get(0).(func(func() error) error); ok {
r0 = rf(activateFunc)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSession_ForceActiveStandby_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceActiveStandby'
type MockSession_ForceActiveStandby_Call struct {
*mock.Call
}
// ForceActiveStandby is a helper method to define mock.On call
// - activateFunc func() error
func (_e *MockSession_Expecter) ForceActiveStandby(activateFunc interface{}) *MockSession_ForceActiveStandby_Call {
return &MockSession_ForceActiveStandby_Call{Call: _e.mock.On("ForceActiveStandby", activateFunc)}
}
func (_c *MockSession_ForceActiveStandby_Call) Run(run func(activateFunc func() error)) *MockSession_ForceActiveStandby_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(func() error))
})
return _c
}
func (_c *MockSession_ForceActiveStandby_Call) Return(_a0 error) *MockSession_ForceActiveStandby_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_ForceActiveStandby_Call) RunAndReturn(run func(func() error) error) *MockSession_ForceActiveStandby_Call {
_c.Call.Return(run)
return _c
}
// GetSessions provides a mock function with given fields: prefix
func (_m *MockSession) GetSessions(prefix string) (map[string]*Session, int64, error) {
ret := _m.Called(prefix)
var r0 map[string]*Session
var r1 int64
var r2 error
if rf, ok := ret.Get(0).(func(string) (map[string]*Session, int64, error)); ok {
return rf(prefix)
}
if rf, ok := ret.Get(0).(func(string) map[string]*Session); ok {
r0 = rf(prefix)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]*Session)
}
}
if rf, ok := ret.Get(1).(func(string) int64); ok {
r1 = rf(prefix)
} else {
r1 = ret.Get(1).(int64)
}
if rf, ok := ret.Get(2).(func(string) error); ok {
r2 = rf(prefix)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockSession_GetSessions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSessions'
type MockSession_GetSessions_Call struct {
*mock.Call
}
// GetSessions is a helper method to define mock.On call
// - prefix string
func (_e *MockSession_Expecter) GetSessions(prefix interface{}) *MockSession_GetSessions_Call {
return &MockSession_GetSessions_Call{Call: _e.mock.On("GetSessions", prefix)}
}
func (_c *MockSession_GetSessions_Call) Run(run func(prefix string)) *MockSession_GetSessions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockSession_GetSessions_Call) Return(_a0 map[string]*Session, _a1 int64, _a2 error) *MockSession_GetSessions_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *MockSession_GetSessions_Call) RunAndReturn(run func(string) (map[string]*Session, int64, error)) *MockSession_GetSessions_Call {
_c.Call.Return(run)
return _c
}
// GetSessionsWithVersionRange provides a mock function with given fields: prefix, r
func (_m *MockSession) GetSessionsWithVersionRange(prefix string, r semver.Range) (map[string]*Session, int64, error) {
ret := _m.Called(prefix, r)
var r0 map[string]*Session
var r1 int64
var r2 error
if rf, ok := ret.Get(0).(func(string, semver.Range) (map[string]*Session, int64, error)); ok {
return rf(prefix, r)
}
if rf, ok := ret.Get(0).(func(string, semver.Range) map[string]*Session); ok {
r0 = rf(prefix, r)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]*Session)
}
}
if rf, ok := ret.Get(1).(func(string, semver.Range) int64); ok {
r1 = rf(prefix, r)
} else {
r1 = ret.Get(1).(int64)
}
if rf, ok := ret.Get(2).(func(string, semver.Range) error); ok {
r2 = rf(prefix, r)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockSession_GetSessionsWithVersionRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSessionsWithVersionRange'
type MockSession_GetSessionsWithVersionRange_Call struct {
*mock.Call
}
// GetSessionsWithVersionRange is a helper method to define mock.On call
// - prefix string
// - r semver.Range
func (_e *MockSession_Expecter) GetSessionsWithVersionRange(prefix interface{}, r interface{}) *MockSession_GetSessionsWithVersionRange_Call {
return &MockSession_GetSessionsWithVersionRange_Call{Call: _e.mock.On("GetSessionsWithVersionRange", prefix, r)}
}
func (_c *MockSession_GetSessionsWithVersionRange_Call) Run(run func(prefix string, r semver.Range)) *MockSession_GetSessionsWithVersionRange_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(semver.Range))
})
return _c
}
func (_c *MockSession_GetSessionsWithVersionRange_Call) Return(_a0 map[string]*Session, _a1 int64, _a2 error) *MockSession_GetSessionsWithVersionRange_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *MockSession_GetSessionsWithVersionRange_Call) RunAndReturn(run func(string, semver.Range) (map[string]*Session, int64, error)) *MockSession_GetSessionsWithVersionRange_Call {
_c.Call.Return(run)
return _c
}
// GoingStop provides a mock function with given fields:
func (_m *MockSession) GoingStop() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSession_GoingStop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GoingStop'
type MockSession_GoingStop_Call struct {
*mock.Call
}
// GoingStop is a helper method to define mock.On call
func (_e *MockSession_Expecter) GoingStop() *MockSession_GoingStop_Call {
return &MockSession_GoingStop_Call{Call: _e.mock.On("GoingStop")}
}
func (_c *MockSession_GoingStop_Call) Run(run func()) *MockSession_GoingStop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_GoingStop_Call) Return(_a0 error) *MockSession_GoingStop_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_GoingStop_Call) RunAndReturn(run func() error) *MockSession_GoingStop_Call {
_c.Call.Return(run)
return _c
}
// Init provides a mock function with given fields: serverName, address, exclusive, triggerKill
func (_m *MockSession) Init(serverName string, address string, exclusive bool, triggerKill bool) {
_m.Called(serverName, address, exclusive, triggerKill)
}
// MockSession_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Init'
type MockSession_Init_Call struct {
*mock.Call
}
// Init is a helper method to define mock.On call
// - serverName string
// - address string
// - exclusive bool
// - triggerKill bool
func (_e *MockSession_Expecter) Init(serverName interface{}, address interface{}, exclusive interface{}, triggerKill interface{}) *MockSession_Init_Call {
return &MockSession_Init_Call{Call: _e.mock.On("Init", serverName, address, exclusive, triggerKill)}
}
func (_c *MockSession_Init_Call) Run(run func(serverName string, address string, exclusive bool, triggerKill bool)) *MockSession_Init_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string), args[2].(bool), args[3].(bool))
})
return _c
}
func (_c *MockSession_Init_Call) Return() *MockSession_Init_Call {
_c.Call.Return()
return _c
}
func (_c *MockSession_Init_Call) RunAndReturn(run func(string, string, bool, bool)) *MockSession_Init_Call {
_c.Call.Return(run)
return _c
}
// LivenessCheck provides a mock function with given fields: ctx, callback
func (_m *MockSession) LivenessCheck(ctx context.Context, callback func()) {
_m.Called(ctx, callback)
}
// MockSession_LivenessCheck_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LivenessCheck'
type MockSession_LivenessCheck_Call struct {
*mock.Call
}
// LivenessCheck is a helper method to define mock.On call
// - ctx context.Context
// - callback func()
func (_e *MockSession_Expecter) LivenessCheck(ctx interface{}, callback interface{}) *MockSession_LivenessCheck_Call {
return &MockSession_LivenessCheck_Call{Call: _e.mock.On("LivenessCheck", ctx, callback)}
}
func (_c *MockSession_LivenessCheck_Call) Run(run func(ctx context.Context, callback func())) *MockSession_LivenessCheck_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(func()))
})
return _c
}
func (_c *MockSession_LivenessCheck_Call) Return() *MockSession_LivenessCheck_Call {
_c.Call.Return()
return _c
}
func (_c *MockSession_LivenessCheck_Call) RunAndReturn(run func(context.Context, func())) *MockSession_LivenessCheck_Call {
_c.Call.Return(run)
return _c
}
// MarshalJSON provides a mock function with given fields:
func (_m *MockSession) MarshalJSON() ([]byte, error) {
ret := _m.Called()
var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func() ([]byte, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockSession_MarshalJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarshalJSON'
type MockSession_MarshalJSON_Call struct {
*mock.Call
}
// MarshalJSON is a helper method to define mock.On call
func (_e *MockSession_Expecter) MarshalJSON() *MockSession_MarshalJSON_Call {
return &MockSession_MarshalJSON_Call{Call: _e.mock.On("MarshalJSON")}
}
func (_c *MockSession_MarshalJSON_Call) Run(run func()) *MockSession_MarshalJSON_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_MarshalJSON_Call) Return(_a0 []byte, _a1 error) *MockSession_MarshalJSON_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockSession_MarshalJSON_Call) RunAndReturn(run func() ([]byte, error)) *MockSession_MarshalJSON_Call {
_c.Call.Return(run)
return _c
}
// ProcessActiveStandBy provides a mock function with given fields: activateFunc
func (_m *MockSession) ProcessActiveStandBy(activateFunc func() error) error {
ret := _m.Called(activateFunc)
var r0 error
if rf, ok := ret.Get(0).(func(func() error) error); ok {
r0 = rf(activateFunc)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSession_ProcessActiveStandBy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessActiveStandBy'
type MockSession_ProcessActiveStandBy_Call struct {
*mock.Call
}
// ProcessActiveStandBy is a helper method to define mock.On call
// - activateFunc func() error
func (_e *MockSession_Expecter) ProcessActiveStandBy(activateFunc interface{}) *MockSession_ProcessActiveStandBy_Call {
return &MockSession_ProcessActiveStandBy_Call{Call: _e.mock.On("ProcessActiveStandBy", activateFunc)}
}
func (_c *MockSession_ProcessActiveStandBy_Call) Run(run func(activateFunc func() error)) *MockSession_ProcessActiveStandBy_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(func() error))
})
return _c
}
func (_c *MockSession_ProcessActiveStandBy_Call) Return(_a0 error) *MockSession_ProcessActiveStandBy_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_ProcessActiveStandBy_Call) RunAndReturn(run func(func() error) error) *MockSession_ProcessActiveStandBy_Call {
_c.Call.Return(run)
return _c
}
// Register provides a mock function with given fields:
func (_m *MockSession) Register() {
_m.Called()
}
// MockSession_Register_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Register'
type MockSession_Register_Call struct {
*mock.Call
}
// Register is a helper method to define mock.On call
func (_e *MockSession_Expecter) Register() *MockSession_Register_Call {
return &MockSession_Register_Call{Call: _e.mock.On("Register")}
}
func (_c *MockSession_Register_Call) Run(run func()) *MockSession_Register_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_Register_Call) Return() *MockSession_Register_Call {
_c.Call.Return()
return _c
}
func (_c *MockSession_Register_Call) RunAndReturn(run func()) *MockSession_Register_Call {
_c.Call.Return(run)
return _c
}
// Registered provides a mock function with given fields:
func (_m *MockSession) Registered() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockSession_Registered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Registered'
type MockSession_Registered_Call struct {
*mock.Call
}
// Registered is a helper method to define mock.On call
func (_e *MockSession_Expecter) Registered() *MockSession_Registered_Call {
return &MockSession_Registered_Call{Call: _e.mock.On("Registered")}
}
func (_c *MockSession_Registered_Call) Run(run func()) *MockSession_Registered_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_Registered_Call) Return(_a0 bool) *MockSession_Registered_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_Registered_Call) RunAndReturn(run func() bool) *MockSession_Registered_Call {
_c.Call.Return(run)
return _c
}
// Revoke provides a mock function with given fields: timeout
func (_m *MockSession) Revoke(timeout time.Duration) {
_m.Called(timeout)
}
// MockSession_Revoke_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Revoke'
type MockSession_Revoke_Call struct {
*mock.Call
}
// Revoke is a helper method to define mock.On call
// - timeout time.Duration
func (_e *MockSession_Expecter) Revoke(timeout interface{}) *MockSession_Revoke_Call {
return &MockSession_Revoke_Call{Call: _e.mock.On("Revoke", timeout)}
}
func (_c *MockSession_Revoke_Call) Run(run func(timeout time.Duration)) *MockSession_Revoke_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(time.Duration))
})
return _c
}
func (_c *MockSession_Revoke_Call) Return() *MockSession_Revoke_Call {
_c.Call.Return()
return _c
}
func (_c *MockSession_Revoke_Call) RunAndReturn(run func(time.Duration)) *MockSession_Revoke_Call {
_c.Call.Return(run)
return _c
}
// SetDisconnected provides a mock function with given fields: b
func (_m *MockSession) SetDisconnected(b bool) {
_m.Called(b)
}
// MockSession_SetDisconnected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetDisconnected'
type MockSession_SetDisconnected_Call struct {
*mock.Call
}
// SetDisconnected is a helper method to define mock.On call
// - b bool
func (_e *MockSession_Expecter) SetDisconnected(b interface{}) *MockSession_SetDisconnected_Call {
return &MockSession_SetDisconnected_Call{Call: _e.mock.On("SetDisconnected", b)}
}
func (_c *MockSession_SetDisconnected_Call) Run(run func(b bool)) *MockSession_SetDisconnected_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(bool))
})
return _c
}
func (_c *MockSession_SetDisconnected_Call) Return() *MockSession_SetDisconnected_Call {
_c.Call.Return()
return _c
}
func (_c *MockSession_SetDisconnected_Call) RunAndReturn(run func(bool)) *MockSession_SetDisconnected_Call {
_c.Call.Return(run)
return _c
}
// SetEnableActiveStandBy provides a mock function with given fields: enable
func (_m *MockSession) SetEnableActiveStandBy(enable bool) {
_m.Called(enable)
}
// MockSession_SetEnableActiveStandBy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetEnableActiveStandBy'
type MockSession_SetEnableActiveStandBy_Call struct {
*mock.Call
}
// SetEnableActiveStandBy is a helper method to define mock.On call
// - enable bool
func (_e *MockSession_Expecter) SetEnableActiveStandBy(enable interface{}) *MockSession_SetEnableActiveStandBy_Call {
return &MockSession_SetEnableActiveStandBy_Call{Call: _e.mock.On("SetEnableActiveStandBy", enable)}
}
func (_c *MockSession_SetEnableActiveStandBy_Call) Run(run func(enable bool)) *MockSession_SetEnableActiveStandBy_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(bool))
})
return _c
}
func (_c *MockSession_SetEnableActiveStandBy_Call) Return() *MockSession_SetEnableActiveStandBy_Call {
_c.Call.Return()
return _c
}
func (_c *MockSession_SetEnableActiveStandBy_Call) RunAndReturn(run func(bool)) *MockSession_SetEnableActiveStandBy_Call {
_c.Call.Return(run)
return _c
}
// Stop provides a mock function with given fields:
func (_m *MockSession) Stop() {
_m.Called()
}
// MockSession_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
type MockSession_Stop_Call struct {
*mock.Call
}
// Stop is a helper method to define mock.On call
func (_e *MockSession_Expecter) Stop() *MockSession_Stop_Call {
return &MockSession_Stop_Call{Call: _e.mock.On("Stop")}
}
func (_c *MockSession_Stop_Call) Run(run func()) *MockSession_Stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_Stop_Call) Return() *MockSession_Stop_Call {
_c.Call.Return()
return _c
}
func (_c *MockSession_Stop_Call) RunAndReturn(run func()) *MockSession_Stop_Call {
_c.Call.Return(run)
return _c
}
// String provides a mock function with given fields:
func (_m *MockSession) String() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockSession_String_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'String'
type MockSession_String_Call struct {
*mock.Call
}
// String is a helper method to define mock.On call
func (_e *MockSession_Expecter) String() *MockSession_String_Call {
return &MockSession_String_Call{Call: _e.mock.On("String")}
}
func (_c *MockSession_String_Call) Run(run func()) *MockSession_String_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSession_String_Call) Return(_a0 string) *MockSession_String_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_String_Call) RunAndReturn(run func() string) *MockSession_String_Call {
_c.Call.Return(run)
return _c
}
// UnmarshalJSON provides a mock function with given fields: data
func (_m *MockSession) UnmarshalJSON(data []byte) error {
ret := _m.Called(data)
var r0 error
if rf, ok := ret.Get(0).(func([]byte) error); ok {
r0 = rf(data)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSession_UnmarshalJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnmarshalJSON'
type MockSession_UnmarshalJSON_Call struct {
*mock.Call
}
// UnmarshalJSON is a helper method to define mock.On call
// - data []byte
func (_e *MockSession_Expecter) UnmarshalJSON(data interface{}) *MockSession_UnmarshalJSON_Call {
return &MockSession_UnmarshalJSON_Call{Call: _e.mock.On("UnmarshalJSON", data)}
}
func (_c *MockSession_UnmarshalJSON_Call) Run(run func(data []byte)) *MockSession_UnmarshalJSON_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].([]byte))
})
return _c
}
func (_c *MockSession_UnmarshalJSON_Call) Return(_a0 error) *MockSession_UnmarshalJSON_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSession_UnmarshalJSON_Call) RunAndReturn(run func([]byte) error) *MockSession_UnmarshalJSON_Call {
_c.Call.Return(run)
return _c
}
// UpdateRegistered provides a mock function with given fields: b
func (_m *MockSession) UpdateRegistered(b bool) {
_m.Called(b)
}
// MockSession_UpdateRegistered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateRegistered'
type MockSession_UpdateRegistered_Call struct {
*mock.Call
}
// UpdateRegistered is a helper method to define mock.On call
// - b bool
func (_e *MockSession_Expecter) UpdateRegistered(b interface{}) *MockSession_UpdateRegistered_Call {
return &MockSession_UpdateRegistered_Call{Call: _e.mock.On("UpdateRegistered", b)}
}
func (_c *MockSession_UpdateRegistered_Call) Run(run func(b bool)) *MockSession_UpdateRegistered_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(bool))
})
return _c
}
func (_c *MockSession_UpdateRegistered_Call) Return() *MockSession_UpdateRegistered_Call {
_c.Call.Return()
return _c
}
func (_c *MockSession_UpdateRegistered_Call) RunAndReturn(run func(bool)) *MockSession_UpdateRegistered_Call {
_c.Call.Return(run)
return _c
}
// WatchServices provides a mock function with given fields: prefix, revision, rewatch
func (_m *MockSession) WatchServices(prefix string, revision int64, rewatch Rewatch) <-chan *SessionEvent {
ret := _m.Called(prefix, revision, rewatch)
var r0 <-chan *SessionEvent
if rf, ok := ret.Get(0).(func(string, int64, Rewatch) <-chan *SessionEvent); ok {
r0 = rf(prefix, revision, rewatch)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan *SessionEvent)
}
}
return r0
}
// MockSession_WatchServices_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchServices'
type MockSession_WatchServices_Call struct {
*mock.Call
}
// WatchServices is a helper method to define mock.On call
// - prefix string
// - revision int64
// - rewatch Rewatch
func (_e *MockSession_Expecter) WatchServices(prefix interface{}, revision interface{}, rewatch interface{}) *MockSession_WatchServices_Call {
return &MockSession_WatchServices_Call{Call: _e.mock.On("WatchServices", prefix, revision, rewatch)}
}
func (_c *MockSession_WatchServices_Call) Run(run func(prefix string, revision int64, rewatch Rewatch)) *MockSession_WatchServices_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(int64), args[2].(Rewatch))
})
return _c
}
func (_c *MockSession_WatchServices_Call) Return(eventChannel <-chan *SessionEvent) *MockSession_WatchServices_Call {
_c.Call.Return(eventChannel)
return _c
}
func (_c *MockSession_WatchServices_Call) RunAndReturn(run func(string, int64, Rewatch) <-chan *SessionEvent) *MockSession_WatchServices_Call {
_c.Call.Return(run)
return _c
}
// WatchServicesWithVersionRange provides a mock function with given fields: prefix, r, revision, rewatch
func (_m *MockSession) WatchServicesWithVersionRange(prefix string, r semver.Range, revision int64, rewatch Rewatch) <-chan *SessionEvent {
ret := _m.Called(prefix, r, revision, rewatch)
var r0 <-chan *SessionEvent
if rf, ok := ret.Get(0).(func(string, semver.Range, int64, Rewatch) <-chan *SessionEvent); ok {
r0 = rf(prefix, r, revision, rewatch)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan *SessionEvent)
}
}
return r0
}
// MockSession_WatchServicesWithVersionRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchServicesWithVersionRange'
type MockSession_WatchServicesWithVersionRange_Call struct {
*mock.Call
}
// WatchServicesWithVersionRange is a helper method to define mock.On call
// - prefix string
// - r semver.Range
// - revision int64
// - rewatch Rewatch
func (_e *MockSession_Expecter) WatchServicesWithVersionRange(prefix interface{}, r interface{}, revision interface{}, rewatch interface{}) *MockSession_WatchServicesWithVersionRange_Call {
return &MockSession_WatchServicesWithVersionRange_Call{Call: _e.mock.On("WatchServicesWithVersionRange", prefix, r, revision, rewatch)}
}
func (_c *MockSession_WatchServicesWithVersionRange_Call) Run(run func(prefix string, r semver.Range, revision int64, rewatch Rewatch)) *MockSession_WatchServicesWithVersionRange_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(semver.Range), args[2].(int64), args[3].(Rewatch))
})
return _c
}
func (_c *MockSession_WatchServicesWithVersionRange_Call) Return(eventChannel <-chan *SessionEvent) *MockSession_WatchServicesWithVersionRange_Call {
_c.Call.Return(eventChannel)
return _c
}
func (_c *MockSession_WatchServicesWithVersionRange_Call) RunAndReturn(run func(string, semver.Range, int64, Rewatch) <-chan *SessionEvent) *MockSession_WatchServicesWithVersionRange_Call {
_c.Call.Return(run)
return _c
}
// NewMockSession creates a new instance of MockSession. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockSession(t interface {
mock.TestingT
Cleanup(func())
}) *MockSession {
mock := &MockSession{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,49 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sessionutil
import (
"context"
"time"
"github.com/blang/semver/v4"
)
type SessionInterface interface {
UnmarshalJSON(data []byte) error
MarshalJSON() ([]byte, error)
Init(serverName, address string, exclusive bool, triggerKill bool)
String() string
Register()
GetSessions(prefix string) (map[string]*Session, int64, error)
GetSessionsWithVersionRange(prefix string, r semver.Range) (map[string]*Session, int64, error)
GoingStop() error
WatchServices(prefix string, revision int64, rewatch Rewatch) (eventChannel <-chan *SessionEvent)
WatchServicesWithVersionRange(prefix string, r semver.Range, revision int64, rewatch Rewatch) (eventChannel <-chan *SessionEvent)
LivenessCheck(ctx context.Context, callback func())
Stop()
Revoke(timeout time.Duration)
UpdateRegistered(b bool)
Registered() bool
SetDisconnected(b bool)
Disconnected() bool
SetEnableActiveStandBy(enable bool)
ProcessActiveStandBy(activateFunc func() error) error
ForceActiveStandby(activateFunc func() error) error
}