Add option for DataNode creation (#8240)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-09-18 19:10:07 +08:00 committed by GitHub
parent 56b3b2ed92
commit 21f15d66bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 8 deletions

View File

@ -17,6 +17,7 @@ import (
"time" "time"
"github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
@ -114,6 +115,10 @@ func newMockDataNodeClient(id int64, ch chan interface{}) (*mockDataNodeClient,
}, nil }, nil
} }
var mockDataNodeCreator DataNodeCreatorFunc = func(_ context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, nil)
}
func (c *mockDataNodeClient) Init() error { func (c *mockDataNodeClient) Init() error {
return nil return nil
} }

View File

@ -70,8 +70,11 @@ const (
ServerStateHealthy ServerState = 2 ServerStateHealthy ServerState = 2
) )
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error) // DataNodeCreatorFunc creator function for datanode
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) type DataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
// RootCoordCreatorFunc creator function for rootcoord
type RootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
// Server implements `types.Datacoord` // Server implements `types.Datacoord`
// handles Data Cooridinator related jobs // handles Data Cooridinator related jobs
@ -99,8 +102,8 @@ type Server struct {
activeCh <-chan bool activeCh <-chan bool
eventCh <-chan *sessionutil.SessionEvent eventCh <-chan *sessionutil.SessionEvent
dataClientCreator dataNodeCreatorFunc dataNodeCreator DataNodeCreatorFunc
rootCoordClientCreator rootCoordCreatorFunc rootCoordClientCreator RootCoordCreatorFunc
} }
// ServerHelper datacoord server injection helper // ServerHelper datacoord server injection helper
@ -118,7 +121,7 @@ func defaultServerHelper() ServerHelper {
type Option func(svr *Server) type Option func(svr *Server)
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter // SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option { func SetRootCoordCreator(creator RootCoordCreatorFunc) Option {
return func(svr *Server) { return func(svr *Server) {
svr.rootCoordClientCreator = creator svr.rootCoordClientCreator = creator
} }
@ -138,6 +141,13 @@ func SetCluster(cluster *Cluster) Option {
} }
} }
// SetDataNodeCreator returns an `Option` setting DataNode create function
func SetDataNodeCreator(creator DataNodeCreatorFunc) Option {
return func(svr *Server) {
svr.dataNodeCreator = creator
}
}
// CreateServer create `Server` instance // CreateServer create `Server` instance
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) { func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
@ -145,7 +155,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option
ctx: ctx, ctx: ctx,
msFactory: factory, msFactory: factory,
flushCh: make(chan UniqueID, 1024), flushCh: make(chan UniqueID, 1024),
dataClientCreator: defaultDataNodeCreatorFunc, dataNodeCreator: defaultDataNodeCreatorFunc,
rootCoordClientCreator: defaultRootCoordCreatorFunc, rootCoordClientCreator: defaultRootCoordCreatorFunc,
helper: defaultServerHelper(), helper: defaultServerHelper(),
@ -158,6 +168,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option
return s, nil return s, nil
} }
// defaultDataNodeCreatorFunc defines the default behavior to get a DataNode
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) { func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
return datanodeclient.NewClient(ctx, addr) return datanodeclient.NewClient(ctx, addr)
} }

View File

@ -1252,7 +1252,7 @@ func TestOptions(t *testing.T) {
t.Run("SetRootCoordCreator", func(t *testing.T) { t.Run("SetRootCoordCreator", func(t *testing.T) {
svr := newTestServer(t, nil) svr := newTestServer(t, nil)
defer closeTestServer(t, svr) defer closeTestServer(t, svr)
var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, endpoints []string) (types.RootCoord, error) { var crt RootCoordCreatorFunc = func(ctx context.Context, metaRoot string, endpoints []string) (types.RootCoord, error) {
return nil, errors.New("dummy") return nil, errors.New("dummy")
} }
opt := SetRootCoordCreator(crt) opt := SetRootCoordCreator(crt)
@ -1282,6 +1282,24 @@ func TestOptions(t *testing.T) {
assert.Equal(t, cluster, svr.cluster) assert.Equal(t, cluster, svr.cluster)
}) })
t.Run("SetDataNodeCreator", func(t *testing.T) {
var target int64
var val int64 = rand.Int63()
opt := SetDataNodeCreator(func(context.Context, string) (types.DataNode, error) {
target = val
return nil, nil
})
assert.NotNil(t, opt)
factory := msgstream.NewPmsFactory()
svr, err := CreateServer(context.TODO(), factory, opt)
assert.Nil(t, err)
dn, err := svr.dataNodeCreator(context.Background(), "")
assert.Nil(t, dn)
assert.Nil(t, err)
assert.Equal(t, target, val)
})
} }
func TestHandleSessionEvent(t *testing.T) { func TestHandleSessionEvent(t *testing.T) {
@ -1433,7 +1451,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se
svr, err := CreateServer(context.TODO(), factory, opts...) svr, err := CreateServer(context.TODO(), factory, opts...)
assert.Nil(t, err) assert.Nil(t, err)
svr.dataClientCreator = func(ctx context.Context, addr string) (types.DataNode, error) { svr.dataNodeCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, receiveCh) return newMockDataNodeClient(0, receiveCh)
} }
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) { svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {