diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 49386138d2..37a64f8f88 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -23,6 +23,7 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" @@ -35,6 +36,15 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" ) +type Base interface { + types.IndexCoord + + Init() error + Start() error + Stop() error + Register() error +} + // Client is the grpc client of IndexCoord. type Client struct { ctx context.Context diff --git a/internal/distributed/querynode/mock.go b/internal/distributed/querynode/mock.go deleted file mode 100644 index e19e087cf0..0000000000 --- a/internal/distributed/querynode/mock.go +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package grpcquerynode - -import ( - "path" - "strconv" - - "errors" - - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/indexpb" - internalPb "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" -) - -const ( - collectionID = 1 - - binlogPathPrefix = "distributed-query-test-binlog" - indexPathPrefix = "distributed-query-test-index" - - uidFieldID = 0 - timestampFieldID = 1 - vecFieldID = 100 - ageFieldID = 101 - vecParamsID = "indexParams" - vecDataID = "IVF" -) - -var fieldIDs = []int64{uidFieldID, timestampFieldID, vecFieldID, ageFieldID} - -/* - masterMock receive segmentID ,return indexID, segmentID = IndexID - dataMock return binlogPath, path = distributed-query-test-binlog/collectionID/segmentID/fieldID - indexMock return indexPath and IndexParam, indexPath = distributed-query-test-index/collectionID/segmentID/indexID, - indexParam use default: - -indexID: 1 - -schema: - collectionID: 1 - partitionID: 1 - segmentID: [1, 10] - 0: int64: uid - 1: int64: timestamp - 100: float32: vec: 16 - 101: int32: age - -indexParams: - indexParams := make(map[string]string) - indexParams["index_type"] = "IVF_PQ" - indexParams["index_mode"] = "cpu" - indexParams["dim"] = "16" - indexParams["k"] = "10" - indexParams["nlist"] = "100" - indexParams["nprobe"] = "10" - indexParams["m"] = "4" - indexParams["nbits"] = "8" - indexParams["metric_type"] = "L2" - indexParams["SLICE_SIZE"] = "4" -*/ - -type RootCoordMock struct { - Count int -} - -func (m *RootCoordMock) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - if m.Count < 20 { - m.Count++ - return nil, errors.New("index not exit") - } - indexParams := make(map[string]string) - indexParams["index_type"] = "IVF_PQ" - indexParams["index_mode"] = "cpu" - indexParams["dim"] = "16" - indexParams["k"] = "10" - indexParams["nlist"] = "100" - indexParams["nprobe"] = "10" - indexParams["m"] = "4" - indexParams["nbits"] = "8" - indexParams["metric_type"] = "L2" - indexParams["SLICE_SIZE"] = "4" - - params := make([]*commonpb.KeyValuePair, 0) - for k, v := range indexParams { - params = append(params, &commonpb.KeyValuePair{ - Key: k, - Value: v, - }) - } - rsp := &milvuspb.DescribeSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - IndexID: in.SegmentID, // use index id as segment id - BuildID: in.SegmentID, - } - return rsp, nil -} - -type DataCoordMock struct { - Count int -} - -func (data *DataCoordMock) GetInsertBinlogPaths(req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) { - if data.Count < 10 { - data.Count++ - return nil, errors.New("binlog not exist") - } - paths := make([]*internalPb.StringList, len(fieldIDs)) - for i := range paths { - pathKey := path.Join(binlogPathPrefix, - strconv.FormatInt(collectionID, 10), - strconv.FormatInt(req.SegmentID, 10), - strconv.FormatInt(fieldIDs[i], 10)) - paths[i] = &internalPb.StringList{ - Values: []string{pathKey}, - } - } - rsp := &datapb.GetInsertBinlogPathsResponse{ - FieldIDs: fieldIDs, - Paths: paths, - } - return rsp, nil -} - -func (data *DataCoordMock) GetSegmentStates(req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { - segmentGrowingInfo := &datapb.SegmentStateInfo{ - State: commonpb.SegmentState_Growing, - } - segmentFlushedInfo := &datapb.SegmentStateInfo{ - State: commonpb.SegmentState_Flushed, - } - - if data.Count < 10 { - data.Count++ - return &datapb.GetSegmentStatesResponse{ - States: []*datapb.SegmentStateInfo{segmentGrowingInfo}, - }, nil - } - - return &datapb.GetSegmentStatesResponse{ - States: []*datapb.SegmentStateInfo{segmentFlushedInfo}, - }, nil -} - -type IndexCoordMock struct { - Count int -} - -func (index *IndexCoordMock) GetIndexFilePaths(req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) { - if index.Count < 30 { - index.Count++ - return nil, errors.New("index path not exist") - } - if len(req.IndexBuildIDs) != 1 { - panic("illegal index ids") - } - segmentID := req.IndexBuildIDs[0] // use index id as segment id - indexPaths1 := path.Join(indexPathPrefix, - strconv.FormatInt(collectionID, 10), - strconv.FormatInt(segmentID, 10), - vecDataID) - indexPaths2 := path.Join(indexPathPrefix, - strconv.FormatInt(collectionID, 10), - strconv.FormatInt(segmentID, 10), - vecParamsID) - indexPathInfo := make([]*indexpb.IndexFilePathInfo, 1) - indexPathInfo[0] = &indexpb.IndexFilePathInfo{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - IndexFilePaths: []string{indexPaths1, indexPaths2}, - } - rsp := &indexpb.GetIndexFilePathsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - FilePaths: indexPathInfo, - } - return rsp, nil -} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index e4717661ef..d2a0989934 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -28,7 +28,6 @@ import ( "google.golang.org/grpc" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" - dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client" rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" "github.com/milvus-io/milvus/internal/log" @@ -46,7 +45,7 @@ import ( type UniqueID = typeutil.UniqueID type Server struct { - querynode *qn.QueryNode + querynode qn.Base wg sync.WaitGroup ctx context.Context cancel context.CancelFunc @@ -54,9 +53,8 @@ type Server struct { grpcServer *grpc.Server - dataCoord *dsc.Client - rootCoord *rcc.GrpcClient - indexCoord *isc.Client + rootCoord rcc.Base + indexCoord isc.Base closer io.Closer } @@ -98,61 +96,64 @@ func (s *Server) init() error { addr := Params.RootCoordAddress log.Debug("QueryNode start to new RootCoordClient", zap.Any("QueryCoordAddress", addr)) - rootCoord, err := rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) - if err != nil { - log.Debug("QueryNode new RootCoordClient failed", zap.Error(err)) - panic(err) + if s.rootCoord == nil { + s.rootCoord, err = rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) + if err != nil { + log.Debug("QueryNode new RootCoordClient failed", zap.Error(err)) + panic(err) + } } - if err = rootCoord.Init(); err != nil { + if err = s.rootCoord.Init(); err != nil { log.Debug("QueryNode RootCoordClient Init failed", zap.Error(err)) panic(err) } - if err = rootCoord.Start(); err != nil { + if err = s.rootCoord.Start(); err != nil { log.Debug("QueryNode RootCoordClient Start failed", zap.Error(err)) panic(err) } log.Debug("QueryNode start to wait for RootCoord ready") - err = funcutil.WaitForComponentHealthy(s.ctx, rootCoord, "RootCoord", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentHealthy(s.ctx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200) if err != nil { log.Debug("QueryNode wait for RootCoord ready failed", zap.Error(err)) panic(err) } log.Debug("QueryNode report RootCoord is ready") - if err := s.SetRootCoord(rootCoord); err != nil { + if err := s.SetRootCoord(s.rootCoord); err != nil { panic(err) } // --- IndexCoord --- log.Debug("Index coord", zap.String("address", Params.IndexCoordAddress)) - indexCoord, err := isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) - - if err != nil { - log.Debug("QueryNode new IndexCoordClient failed", zap.Error(err)) - panic(err) + if s.indexCoord == nil { + s.indexCoord, err = isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints) + if err != nil { + log.Debug("QueryNode new IndexCoordClient failed", zap.Error(err)) + panic(err) + } } - if err := indexCoord.Init(); err != nil { + if err := s.indexCoord.Init(); err != nil { log.Debug("QueryNode IndexCoordClient Init failed", zap.Error(err)) panic(err) } - if err := indexCoord.Start(); err != nil { + if err := s.indexCoord.Start(); err != nil { log.Debug("QueryNode IndexCoordClient Start failed", zap.Error(err)) panic(err) } // wait IndexCoord healthy log.Debug("QueryNode start to wait for IndexCoord ready") - err = funcutil.WaitForComponentHealthy(s.ctx, indexCoord, "IndexCoord", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentHealthy(s.ctx, s.indexCoord, "IndexCoord", 1000000, time.Millisecond*200) if err != nil { log.Debug("QueryNode wait for IndexCoord ready failed", zap.Error(err)) panic(err) } log.Debug("QueryNode report IndexCoord is ready") - if err := s.SetIndexCoord(indexCoord); err != nil { + if err := s.SetIndexCoord(s.indexCoord); err != nil { panic(err) } @@ -258,6 +259,12 @@ func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) error { return s.querynode.SetIndexCoord(indexCoord) } +// SetClient sets the IndexNode's instance. +func (s *Server) SetClient(queryNodeClient qn.Base) error { + s.querynode = queryNodeClient + return nil +} + func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) { return s.querynode.GetTimeTickChannel(ctx) } diff --git a/internal/distributed/querynode/service_test.go b/internal/distributed/querynode/service_test.go new file mode 100644 index 0000000000..0ba687d858 --- /dev/null +++ b/internal/distributed/querynode/service_test.go @@ -0,0 +1,319 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcquerynode + +import ( + "context" + "errors" + "testing" + + isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client" + rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" + + "github.com/milvus-io/milvus/internal/types" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/stretchr/testify/assert" +) + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +type MockQueryNode struct { + states *internalpb.ComponentStates + status *commonpb.Status + err error + strResp *milvuspb.StringResponse + infoResp *querypb.GetSegmentInfoResponse + metricResp *milvuspb.GetMetricsResponse +} + +func (m *MockQueryNode) Init() error { + return m.err +} + +func (m *MockQueryNode) Start() error { + return m.err +} + +func (m *MockQueryNode) Stop() error { + return m.err +} + +func (m *MockQueryNode) Register() error { + return m.err +} + +func (m *MockQueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + return m.states, m.err +} + +func (m *MockQueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return m.strResp, m.err +} + +func (m *MockQueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { + return m.strResp, m.err +} + +func (m *MockQueryNode) AddQueryChannel(ctx context.Context, req *querypb.AddQueryChannelRequest) (*commonpb.Status, error) { + return m.status, m.err +} + +func (m *MockQueryNode) RemoveQueryChannel(ctx context.Context, req *querypb.RemoveQueryChannelRequest) (*commonpb.Status, error) { + return m.status, m.err +} + +func (m *MockQueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { + return m.status, m.err +} + +func (m *MockQueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) { + return m.status, m.err +} + +func (m *MockQueryNode) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { + return m.status, m.err +} + +func (m *MockQueryNode) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { + return m.status, m.err +} + +func (m *MockQueryNode) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) { + return m.status, m.err +} + +func (m *MockQueryNode) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { + return m.infoResp, m.err +} + +func (m *MockQueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + return m.metricResp, m.err +} + +func (m *MockQueryNode) UpdateStateCode(code internalpb.StateCode) { +} + +func (m *MockQueryNode) SetRootCoord(rc types.RootCoord) error { + return m.err +} + +func (m *MockQueryNode) SetIndexCoord(index types.IndexCoord) error { + return m.err +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +type MockRootCoord struct { + rcc.Base + initErr error + startErr error + regErr error + stopErr error + stateErr commonpb.ErrorCode +} + +func (m *MockRootCoord) Init() error { + return m.initErr +} + +func (m *MockRootCoord) Start() error { + return m.startErr +} + +func (m *MockRootCoord) Stop() error { + return m.stopErr +} + +func (m *MockRootCoord) Register() error { + return m.regErr +} + +func (m *MockRootCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{StateCode: internalpb.StateCode_Healthy}, + Status: &commonpb.Status{ErrorCode: m.stateErr}, + }, nil +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +type MockIndexCoord struct { + isc.Base + initErr error + startErr error + regErr error + stopErr error + stateErr commonpb.ErrorCode +} + +func (m *MockIndexCoord) Init() error { + return m.initErr +} + +func (m *MockIndexCoord) Start() error { + return m.startErr +} + +func (m *MockIndexCoord) Stop() error { + return m.stopErr +} + +func (m *MockIndexCoord) Register() error { + return m.regErr +} + +func (m *MockIndexCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { + return &internalpb.ComponentStates{ + State: &internalpb.ComponentInfo{StateCode: internalpb.StateCode_Healthy}, + Status: &commonpb.Status{ErrorCode: m.stateErr}, + }, nil +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +func Test_NewServer(t *testing.T) { + ctx := context.Background() + qns, err := NewServer(ctx, nil) + assert.Nil(t, err) + assert.NotNil(t, qns) + + mqn := &MockQueryNode{ + states: &internalpb.ComponentStates{State: &internalpb.ComponentInfo{StateCode: internalpb.StateCode_Healthy}}, + status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + err: nil, + strResp: &milvuspb.StringResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, + infoResp: &querypb.GetSegmentInfoResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, + metricResp: &milvuspb.GetMetricsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, + } + err = qns.SetClient(mqn) + assert.Nil(t, err) + + t.Run("Run", func(t *testing.T) { + qns.rootCoord = &MockRootCoord{} + qns.indexCoord = &MockIndexCoord{} + + err = qns.Run() + assert.Nil(t, err) + }) + + t.Run("GetComponentStates", func(t *testing.T) { + req := &internalpb.GetComponentStatesRequest{} + states, err := qns.GetComponentStates(ctx, req) + assert.Nil(t, err) + assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode) + }) + + t.Run("GetStatisticsChannel", func(t *testing.T) { + req := &internalpb.GetStatisticsChannelRequest{} + resp, err := qns.GetStatisticsChannel(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetTimeTickChannel", func(t *testing.T) { + req := &internalpb.GetTimeTickChannelRequest{} + resp, err := qns.GetTimeTickChannel(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("AddQueryChannel", func(t *testing.T) { + req := &querypb.AddQueryChannelRequest{} + resp, err := qns.AddQueryChannel(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("RemoveQueryChannel", func(t *testing.T) { + req := &querypb.RemoveQueryChannelRequest{} + resp, err := qns.RemoveQueryChannel(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("WatchDmChannels", func(t *testing.T) { + req := &querypb.WatchDmChannelsRequest{} + resp, err := qns.WatchDmChannels(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("LoadSegments", func(t *testing.T) { + req := &querypb.LoadSegmentsRequest{} + resp, err := qns.LoadSegments(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("ReleaseCollection", func(t *testing.T) { + req := &querypb.ReleaseCollectionRequest{} + resp, err := qns.ReleaseCollection(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("ReleasePartitions", func(t *testing.T) { + req := &querypb.ReleasePartitionsRequest{} + resp, err := qns.ReleasePartitions(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("ReleaseSegments", func(t *testing.T) { + req := &querypb.ReleaseSegmentsRequest{} + resp, err := qns.ReleaseSegments(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) + }) + + t.Run("GetSegmentInfo", func(t *testing.T) { + req := &querypb.GetSegmentInfoRequest{} + resp, err := qns.GetSegmentInfo(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + t.Run("GetMetrics", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{ + Request: "", + } + resp, err := qns.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + }) + + err = qns.Stop() + assert.Nil(t, err) +} + +func Test_Run(t *testing.T) { + ctx := context.Background() + qns, err := NewServer(ctx, nil) + assert.Nil(t, err) + assert.NotNil(t, qns) + + qns.rootCoord = &MockRootCoord{initErr: errors.New("Failed")} + qns.indexCoord = &MockIndexCoord{} + assert.Panics(t, func() { err = qns.Run() }) + + qns.rootCoord = &MockRootCoord{startErr: errors.New("Failed")} + qns.indexCoord = &MockIndexCoord{} + assert.Panics(t, func() { err = qns.Run() }) + + qns.rootCoord = &MockRootCoord{} + qns.indexCoord = &MockIndexCoord{initErr: errors.New("Failed")} + assert.Panics(t, func() { err = qns.Run() }) + + qns.rootCoord = &MockRootCoord{} + qns.indexCoord = &MockIndexCoord{startErr: errors.New("Failed")} + assert.Panics(t, func() { err = qns.Run() }) +} diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 4478a6b7cd..677e807dbf 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" @@ -36,6 +37,15 @@ import ( "google.golang.org/grpc/codes" ) +type Base interface { + types.RootCoord + + Init() error + Start() error + Stop() error + Register() error +} + // GrpcClient grpc client type GrpcClient struct { ctx context.Context diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 705884be30..7f030ae9cf 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -45,6 +45,14 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" ) +type Base interface { + types.QueryNode + + UpdateStateCode(code internalpb.StateCode) + SetRootCoord(rc types.RootCoord) error + SetIndexCoord(index types.IndexCoord) error +} + // QueryNode communicates with outside services and union all // services in querynode package. //