enhance: Change DN channelmanger into interface (#29307)

See also: #28854

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2023-12-27 16:00:48 +08:00 committed by GitHub
parent fe04598900
commit 632d8b3743
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 276 additions and 28 deletions

View File

@ -451,6 +451,7 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage
$(INSTALL_PATH)/mockery --name=FlowgraphManager --dir=$(PWD)/internal/datanode --output=$(PWD)/internal/datanode --filename=mock_fgmanager.go --with-expecter --structname=MockFlowgraphManager --outpkg=datanode --inpackage
$(INSTALL_PATH)/mockery --name=ChannelManager --dir=$(PWD)/internal/datanode --output=$(PWD)/internal/datanode --filename=mock_channelmanager.go --with-expecter --structname=MockChannelManager --outpkg=datanode --inpackage
generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks

View File

@ -33,7 +33,14 @@ import (
type releaseFunc func(channel string)
type ChannelManager struct {
type ChannelManager interface {
Submit(info *datapb.ChannelWatchInfo) error
GetProgress(info *datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse
Close()
Start()
}
type ChannelManagerImpl struct {
mu sync.RWMutex
dn *DataNode
@ -50,9 +57,9 @@ type ChannelManager struct {
closeWaiter sync.WaitGroup
}
func NewChannelManager(dn *DataNode) *ChannelManager {
func NewChannelManager(dn *DataNode) *ChannelManagerImpl {
fm := newFlowgraphManager()
cm := ChannelManager{
cm := ChannelManagerImpl{
dn: dn,
fgManager: fm,
@ -68,13 +75,13 @@ func NewChannelManager(dn *DataNode) *ChannelManager {
return &cm
}
func (m *ChannelManager) Submit(info *datapb.ChannelWatchInfo) error {
func (m *ChannelManagerImpl) Submit(info *datapb.ChannelWatchInfo) error {
channel := info.GetVchan().GetChannelName()
runner := m.getOrCreateRunner(channel)
return runner.Enqueue(info)
}
func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse {
func (m *ChannelManagerImpl) GetProgress(info *datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse {
m.mu.RLock()
defer m.mu.RUnlock()
resp := &datapb.ChannelOperationProgressResponse{
@ -85,8 +92,10 @@ func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.Chan
channel := info.GetVchan().GetChannelName()
switch info.GetState() {
case datapb.ChannelWatchState_ToWatch:
// running flowgraph means watch success
if m.fgManager.HasFlowgraphWithOpID(channel, info.GetOpID()) {
resp.State = datapb.ChannelWatchState_WatchSuccess
resp.Progress = 100
return resp
}
@ -121,7 +130,7 @@ func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.Chan
}
}
func (m *ChannelManager) Close() {
func (m *ChannelManagerImpl) Close() {
m.closeOnce.Do(func() {
m.opRunners.Range(func(channel string, runner *opRunner) bool {
runner.Close()
@ -132,7 +141,7 @@ func (m *ChannelManager) Close() {
})
}
func (m *ChannelManager) Start() {
func (m *ChannelManagerImpl) Start() {
m.closeWaiter.Add(1)
go func() {
defer m.closeWaiter.Done()
@ -149,7 +158,7 @@ func (m *ChannelManager) Start() {
}()
}
func (m *ChannelManager) handleOpState(opState *opState) {
func (m *ChannelManagerImpl) handleOpState(opState *opState) {
m.mu.Lock()
defer m.mu.Unlock()
log := log.With(
@ -180,7 +189,7 @@ func (m *ChannelManager) handleOpState(opState *opState) {
}
}
func (m *ChannelManager) getOrCreateRunner(channel string) *opRunner {
func (m *ChannelManagerImpl) getOrCreateRunner(channel string) *opRunner {
runner, loaded := m.opRunners.GetOrInsert(channel, NewOpRunner(channel, m.dn, m.releaseFunc, m.communicateCh))
if !loaded {
runner.Start()
@ -188,13 +197,13 @@ func (m *ChannelManager) getOrCreateRunner(channel string) *opRunner {
return runner
}
func (m *ChannelManager) destoryRunner(channel string) {
func (m *ChannelManagerImpl) destoryRunner(channel string) {
if runner, loaded := m.opRunners.GetAndRemove(channel); loaded {
runner.Close()
}
}
func (m *ChannelManager) finishOp(opID int64, channel string) {
func (m *ChannelManagerImpl) finishOp(opID int64, channel string) {
if runner, loaded := m.opRunners.Get(channel); loaded {
runner.FinishOp(opID)
}
@ -328,22 +337,24 @@ func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState {
timer := time.NewTimer(watchTimeout)
defer timer.Stop()
log.Info("Start timer for ToWatch operation", zap.Duration("timeout", watchTimeout))
log := log.With(zap.Duration("timeout", watchTimeout))
log.Info("Start timer for ToWatch operation")
for {
select {
case <-timer.C:
// watch timeout
tickler.close()
cancel()
log.Info("Stop timer for ToWatch operation timeout", zap.Duration("timeout", watchTimeout))
log.Info("Stop timer for ToWatch operation timeout")
return
case <-tickler.progressSig:
log.Info("Reset timer for tickler updated")
timer.Reset(watchTimeout)
case <-successSig:
// watch success
log.Info("Stop timer for ToWatch operation succeeded", zap.Duration("timeout", watchTimeout))
log.Info("Stop timer for ToWatch operation succeeded")
return
}
}

View File

@ -35,7 +35,7 @@ type ChannelManagerSuite struct {
suite.Suite
node *DataNode
manager *ChannelManager
manager *ChannelManagerImpl
}
func (s *ChannelManagerSuite) SetupTest() {

View File

@ -88,6 +88,7 @@ type DataNode struct {
stateCode atomic.Value // commonpb.StateCode_Initializing
flowgraphManager FlowgraphManager
eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager]
channelManager ChannelManager
syncMgr syncmgr.SyncManager
writeBufferManager writebuffer.BufferManager

View File

@ -0,0 +1,185 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datanode
import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
)
// MockChannelManager is an autogenerated mock type for the ChannelManager type
type MockChannelManager struct {
mock.Mock
}
type MockChannelManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockChannelManager) EXPECT() *MockChannelManager_Expecter {
return &MockChannelManager_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
func (_m *MockChannelManager) Close() {
_m.Called()
}
// MockChannelManager_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockChannelManager_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockChannelManager_Expecter) Close() *MockChannelManager_Close_Call {
return &MockChannelManager_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockChannelManager_Close_Call) Run(run func()) *MockChannelManager_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockChannelManager_Close_Call) Return() *MockChannelManager_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockChannelManager_Close_Call) RunAndReturn(run func()) *MockChannelManager_Close_Call {
_c.Call.Return(run)
return _c
}
// GetProgress provides a mock function with given fields: info
func (_m *MockChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse {
ret := _m.Called(info)
var r0 *datapb.ChannelOperationProgressResponse
if rf, ok := ret.Get(0).(func(*datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse); ok {
r0 = rf(info)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.ChannelOperationProgressResponse)
}
}
return r0
}
// MockChannelManager_GetProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetProgress'
type MockChannelManager_GetProgress_Call struct {
*mock.Call
}
// GetProgress is a helper method to define mock.On call
// - info *datapb.ChannelWatchInfo
func (_e *MockChannelManager_Expecter) GetProgress(info interface{}) *MockChannelManager_GetProgress_Call {
return &MockChannelManager_GetProgress_Call{Call: _e.mock.On("GetProgress", info)}
}
func (_c *MockChannelManager_GetProgress_Call) Run(run func(info *datapb.ChannelWatchInfo)) *MockChannelManager_GetProgress_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*datapb.ChannelWatchInfo))
})
return _c
}
func (_c *MockChannelManager_GetProgress_Call) Return(_a0 *datapb.ChannelOperationProgressResponse) *MockChannelManager_GetProgress_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_GetProgress_Call) RunAndReturn(run func(*datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse) *MockChannelManager_GetProgress_Call {
_c.Call.Return(run)
return _c
}
// Start provides a mock function with given fields:
func (_m *MockChannelManager) Start() {
_m.Called()
}
// MockChannelManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
type MockChannelManager_Start_Call struct {
*mock.Call
}
// Start is a helper method to define mock.On call
func (_e *MockChannelManager_Expecter) Start() *MockChannelManager_Start_Call {
return &MockChannelManager_Start_Call{Call: _e.mock.On("Start")}
}
func (_c *MockChannelManager_Start_Call) Run(run func()) *MockChannelManager_Start_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockChannelManager_Start_Call) Return() *MockChannelManager_Start_Call {
_c.Call.Return()
return _c
}
func (_c *MockChannelManager_Start_Call) RunAndReturn(run func()) *MockChannelManager_Start_Call {
_c.Call.Return(run)
return _c
}
// Submit provides a mock function with given fields: info
func (_m *MockChannelManager) Submit(info *datapb.ChannelWatchInfo) error {
ret := _m.Called(info)
var r0 error
if rf, ok := ret.Get(0).(func(*datapb.ChannelWatchInfo) error); ok {
r0 = rf(info)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockChannelManager_Submit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Submit'
type MockChannelManager_Submit_Call struct {
*mock.Call
}
// Submit is a helper method to define mock.On call
// - info *datapb.ChannelWatchInfo
func (_e *MockChannelManager_Expecter) Submit(info interface{}) *MockChannelManager_Submit_Call {
return &MockChannelManager_Submit_Call{Call: _e.mock.On("Submit", info)}
}
func (_c *MockChannelManager_Submit_Call) Run(run func(info *datapb.ChannelWatchInfo)) *MockChannelManager_Submit_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*datapb.ChannelWatchInfo))
})
return _c
}
func (_c *MockChannelManager_Submit_Call) Return(_a0 error) *MockChannelManager_Submit_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_Submit_Call) RunAndReturn(run func(*datapb.ChannelWatchInfo) error) *MockChannelManager_Submit_Call {
_c.Call.Return(run)
return _c
}
// NewMockChannelManager creates a new instance of MockChannelManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockChannelManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockChannelManager {
mock := &MockChannelManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -360,15 +360,39 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
}
func (node *DataNode) NotifyChannelOperation(ctx context.Context, req *datapb.ChannelOperationsRequest) (*commonpb.Status, error) {
log.Warn("DataNode NotifyChannelOperation is unimplemented")
return merr.Status(merr.ErrServiceUnavailable), nil
log.Ctx(ctx).Info("DataNode receives NotifyChannelOperation",
zap.Int("operation count", len(req.GetInfos())))
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.NotifyChannelOperation failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return merr.Status(err), nil
}
for _, info := range req.GetInfos() {
err := node.channelManager.Submit(info)
if err != nil {
log.Warn("Submit error", zap.Error(err))
return merr.Status(err), nil
}
}
return merr.Status(nil), nil
}
func (node *DataNode) CheckChannelOperationProgress(ctx context.Context, req *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) {
log.Warn("DataNode CheckChannelOperationProgress is unimplemented")
return &datapb.ChannelOperationProgressResponse{
Status: merr.Status(merr.ErrServiceUnavailable),
}, nil
log := log.Ctx(ctx).With(
zap.String("channel", req.GetVchan().GetChannelName()),
zap.String("operation", req.GetState().String()),
)
log.Info("DataNode receives CheckChannelOperationProgress")
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Warn("DataNode.CheckChannelOperationProgress failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return &datapb.ChannelOperationProgressResponse{
Status: merr.Status(err),
}, nil
}
return node.channelManager.GetProgress(req), nil
}
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments

View File

@ -930,12 +930,38 @@ func (s *DataNodeServicesSuite) TestFlushChannels() {
}*/
func (s *DataNodeServicesSuite) TestRPCWatch() {
ctx := context.Background()
status, err := s.node.NotifyChannelOperation(ctx, nil)
s.NoError(err)
s.NotNil(status)
s.Run("node not healthy", func() {
s.SetupTest()
s.node.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := s.node.CheckChannelOperationProgress(ctx, nil)
s.NoError(err)
s.NotNil(resp)
ctx := context.Background()
status, err := s.node.NotifyChannelOperation(ctx, nil)
s.NoError(err)
s.False(merr.Ok(status))
s.ErrorIs(merr.Error(status), merr.ErrServiceNotReady)
resp, err := s.node.CheckChannelOperationProgress(ctx, nil)
s.NoError(err)
s.False(merr.Ok(resp.GetStatus()))
s.ErrorIs(merr.Error(status), merr.ErrServiceNotReady)
})
s.Run("node healthy", func() {
s.SetupTest()
mockChManager := NewMockChannelManager(s.T())
s.node.channelManager = mockChManager
mockChManager.EXPECT().Submit(mock.Anything).Return(nil).Once()
ctx := context.Background()
status, err := s.node.NotifyChannelOperation(ctx, &datapb.ChannelOperationsRequest{Infos: []*datapb.ChannelWatchInfo{{OpID: 19530}}})
s.NoError(err)
s.True(merr.Ok(status))
mockChManager.EXPECT().GetProgress(mock.Anything).Return(
&datapb.ChannelOperationProgressResponse{Status: merr.Status(nil)},
).Once()
resp, err := s.node.CheckChannelOperationProgress(ctx, nil)
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
})
}