mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
Add unittest for distributed/querynode (#8834)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
parent
73150982e2
commit
b17d253db1
@ -23,6 +23,7 @@ import (
|
|||||||
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
||||||
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
"github.com/milvus-io/milvus/internal/util/retry"
|
"github.com/milvus-io/milvus/internal/util/retry"
|
||||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/trace"
|
"github.com/milvus-io/milvus/internal/util/trace"
|
||||||
@ -35,6 +36,15 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Base interface {
|
||||||
|
types.IndexCoord
|
||||||
|
|
||||||
|
Init() error
|
||||||
|
Start() error
|
||||||
|
Stop() error
|
||||||
|
Register() error
|
||||||
|
}
|
||||||
|
|
||||||
// Client is the grpc client of IndexCoord.
|
// Client is the grpc client of IndexCoord.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
@ -1,193 +0,0 @@
|
|||||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
// with the License. You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
||||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
||||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
||||||
|
|
||||||
package grpcquerynode
|
|
||||||
|
|
||||||
import (
|
|
||||||
"path"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
|
||||||
internalPb "github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
collectionID = 1
|
|
||||||
|
|
||||||
binlogPathPrefix = "distributed-query-test-binlog"
|
|
||||||
indexPathPrefix = "distributed-query-test-index"
|
|
||||||
|
|
||||||
uidFieldID = 0
|
|
||||||
timestampFieldID = 1
|
|
||||||
vecFieldID = 100
|
|
||||||
ageFieldID = 101
|
|
||||||
vecParamsID = "indexParams"
|
|
||||||
vecDataID = "IVF"
|
|
||||||
)
|
|
||||||
|
|
||||||
var fieldIDs = []int64{uidFieldID, timestampFieldID, vecFieldID, ageFieldID}
|
|
||||||
|
|
||||||
/*
|
|
||||||
masterMock receive segmentID ,return indexID, segmentID = IndexID
|
|
||||||
dataMock return binlogPath, path = distributed-query-test-binlog/collectionID/segmentID/fieldID
|
|
||||||
indexMock return indexPath and IndexParam, indexPath = distributed-query-test-index/collectionID/segmentID/indexID,
|
|
||||||
indexParam use default:
|
|
||||||
|
|
||||||
indexID: 1
|
|
||||||
|
|
||||||
schema:
|
|
||||||
collectionID: 1
|
|
||||||
partitionID: 1
|
|
||||||
segmentID: [1, 10]
|
|
||||||
0: int64: uid
|
|
||||||
1: int64: timestamp
|
|
||||||
100: float32: vec: 16
|
|
||||||
101: int32: age
|
|
||||||
|
|
||||||
indexParams:
|
|
||||||
indexParams := make(map[string]string)
|
|
||||||
indexParams["index_type"] = "IVF_PQ"
|
|
||||||
indexParams["index_mode"] = "cpu"
|
|
||||||
indexParams["dim"] = "16"
|
|
||||||
indexParams["k"] = "10"
|
|
||||||
indexParams["nlist"] = "100"
|
|
||||||
indexParams["nprobe"] = "10"
|
|
||||||
indexParams["m"] = "4"
|
|
||||||
indexParams["nbits"] = "8"
|
|
||||||
indexParams["metric_type"] = "L2"
|
|
||||||
indexParams["SLICE_SIZE"] = "4"
|
|
||||||
*/
|
|
||||||
|
|
||||||
type RootCoordMock struct {
|
|
||||||
Count int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *RootCoordMock) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
|
|
||||||
if m.Count < 20 {
|
|
||||||
m.Count++
|
|
||||||
return nil, errors.New("index not exit")
|
|
||||||
}
|
|
||||||
indexParams := make(map[string]string)
|
|
||||||
indexParams["index_type"] = "IVF_PQ"
|
|
||||||
indexParams["index_mode"] = "cpu"
|
|
||||||
indexParams["dim"] = "16"
|
|
||||||
indexParams["k"] = "10"
|
|
||||||
indexParams["nlist"] = "100"
|
|
||||||
indexParams["nprobe"] = "10"
|
|
||||||
indexParams["m"] = "4"
|
|
||||||
indexParams["nbits"] = "8"
|
|
||||||
indexParams["metric_type"] = "L2"
|
|
||||||
indexParams["SLICE_SIZE"] = "4"
|
|
||||||
|
|
||||||
params := make([]*commonpb.KeyValuePair, 0)
|
|
||||||
for k, v := range indexParams {
|
|
||||||
params = append(params, &commonpb.KeyValuePair{
|
|
||||||
Key: k,
|
|
||||||
Value: v,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
rsp := &milvuspb.DescribeSegmentResponse{
|
|
||||||
Status: &commonpb.Status{
|
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
|
||||||
},
|
|
||||||
IndexID: in.SegmentID, // use index id as segment id
|
|
||||||
BuildID: in.SegmentID,
|
|
||||||
}
|
|
||||||
return rsp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type DataCoordMock struct {
|
|
||||||
Count int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (data *DataCoordMock) GetInsertBinlogPaths(req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
|
|
||||||
if data.Count < 10 {
|
|
||||||
data.Count++
|
|
||||||
return nil, errors.New("binlog not exist")
|
|
||||||
}
|
|
||||||
paths := make([]*internalPb.StringList, len(fieldIDs))
|
|
||||||
for i := range paths {
|
|
||||||
pathKey := path.Join(binlogPathPrefix,
|
|
||||||
strconv.FormatInt(collectionID, 10),
|
|
||||||
strconv.FormatInt(req.SegmentID, 10),
|
|
||||||
strconv.FormatInt(fieldIDs[i], 10))
|
|
||||||
paths[i] = &internalPb.StringList{
|
|
||||||
Values: []string{pathKey},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rsp := &datapb.GetInsertBinlogPathsResponse{
|
|
||||||
FieldIDs: fieldIDs,
|
|
||||||
Paths: paths,
|
|
||||||
}
|
|
||||||
return rsp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (data *DataCoordMock) GetSegmentStates(req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
|
|
||||||
segmentGrowingInfo := &datapb.SegmentStateInfo{
|
|
||||||
State: commonpb.SegmentState_Growing,
|
|
||||||
}
|
|
||||||
segmentFlushedInfo := &datapb.SegmentStateInfo{
|
|
||||||
State: commonpb.SegmentState_Flushed,
|
|
||||||
}
|
|
||||||
|
|
||||||
if data.Count < 10 {
|
|
||||||
data.Count++
|
|
||||||
return &datapb.GetSegmentStatesResponse{
|
|
||||||
States: []*datapb.SegmentStateInfo{segmentGrowingInfo},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return &datapb.GetSegmentStatesResponse{
|
|
||||||
States: []*datapb.SegmentStateInfo{segmentFlushedInfo},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type IndexCoordMock struct {
|
|
||||||
Count int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (index *IndexCoordMock) GetIndexFilePaths(req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
|
|
||||||
if index.Count < 30 {
|
|
||||||
index.Count++
|
|
||||||
return nil, errors.New("index path not exist")
|
|
||||||
}
|
|
||||||
if len(req.IndexBuildIDs) != 1 {
|
|
||||||
panic("illegal index ids")
|
|
||||||
}
|
|
||||||
segmentID := req.IndexBuildIDs[0] // use index id as segment id
|
|
||||||
indexPaths1 := path.Join(indexPathPrefix,
|
|
||||||
strconv.FormatInt(collectionID, 10),
|
|
||||||
strconv.FormatInt(segmentID, 10),
|
|
||||||
vecDataID)
|
|
||||||
indexPaths2 := path.Join(indexPathPrefix,
|
|
||||||
strconv.FormatInt(collectionID, 10),
|
|
||||||
strconv.FormatInt(segmentID, 10),
|
|
||||||
vecParamsID)
|
|
||||||
indexPathInfo := make([]*indexpb.IndexFilePathInfo, 1)
|
|
||||||
indexPathInfo[0] = &indexpb.IndexFilePathInfo{
|
|
||||||
Status: &commonpb.Status{
|
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
|
||||||
},
|
|
||||||
IndexFilePaths: []string{indexPaths1, indexPaths2},
|
|
||||||
}
|
|
||||||
rsp := &indexpb.GetIndexFilePathsResponse{
|
|
||||||
Status: &commonpb.Status{
|
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
|
||||||
},
|
|
||||||
FilePaths: indexPathInfo,
|
|
||||||
}
|
|
||||||
return rsp, nil
|
|
||||||
}
|
|
@ -28,7 +28,6 @@ import (
|
|||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
||||||
dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
|
|
||||||
isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
|
isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
|
||||||
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
@ -46,7 +45,7 @@ import (
|
|||||||
type UniqueID = typeutil.UniqueID
|
type UniqueID = typeutil.UniqueID
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
querynode *qn.QueryNode
|
querynode qn.Base
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@ -54,9 +53,8 @@ type Server struct {
|
|||||||
|
|
||||||
grpcServer *grpc.Server
|
grpcServer *grpc.Server
|
||||||
|
|
||||||
dataCoord *dsc.Client
|
rootCoord rcc.Base
|
||||||
rootCoord *rcc.GrpcClient
|
indexCoord isc.Base
|
||||||
indexCoord *isc.Client
|
|
||||||
|
|
||||||
closer io.Closer
|
closer io.Closer
|
||||||
}
|
}
|
||||||
@ -98,61 +96,64 @@ func (s *Server) init() error {
|
|||||||
addr := Params.RootCoordAddress
|
addr := Params.RootCoordAddress
|
||||||
|
|
||||||
log.Debug("QueryNode start to new RootCoordClient", zap.Any("QueryCoordAddress", addr))
|
log.Debug("QueryNode start to new RootCoordClient", zap.Any("QueryCoordAddress", addr))
|
||||||
rootCoord, err := rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
|
if s.rootCoord == nil {
|
||||||
if err != nil {
|
s.rootCoord, err = rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
|
||||||
log.Debug("QueryNode new RootCoordClient failed", zap.Error(err))
|
if err != nil {
|
||||||
panic(err)
|
log.Debug("QueryNode new RootCoordClient failed", zap.Error(err))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = rootCoord.Init(); err != nil {
|
if err = s.rootCoord.Init(); err != nil {
|
||||||
log.Debug("QueryNode RootCoordClient Init failed", zap.Error(err))
|
log.Debug("QueryNode RootCoordClient Init failed", zap.Error(err))
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = rootCoord.Start(); err != nil {
|
if err = s.rootCoord.Start(); err != nil {
|
||||||
log.Debug("QueryNode RootCoordClient Start failed", zap.Error(err))
|
log.Debug("QueryNode RootCoordClient Start failed", zap.Error(err))
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
log.Debug("QueryNode start to wait for RootCoord ready")
|
log.Debug("QueryNode start to wait for RootCoord ready")
|
||||||
err = funcutil.WaitForComponentHealthy(s.ctx, rootCoord, "RootCoord", 1000000, time.Millisecond*200)
|
err = funcutil.WaitForComponentHealthy(s.ctx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("QueryNode wait for RootCoord ready failed", zap.Error(err))
|
log.Debug("QueryNode wait for RootCoord ready failed", zap.Error(err))
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
log.Debug("QueryNode report RootCoord is ready")
|
log.Debug("QueryNode report RootCoord is ready")
|
||||||
|
|
||||||
if err := s.SetRootCoord(rootCoord); err != nil {
|
if err := s.SetRootCoord(s.rootCoord); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- IndexCoord ---
|
// --- IndexCoord ---
|
||||||
log.Debug("Index coord", zap.String("address", Params.IndexCoordAddress))
|
log.Debug("Index coord", zap.String("address", Params.IndexCoordAddress))
|
||||||
indexCoord, err := isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
|
if s.indexCoord == nil {
|
||||||
|
s.indexCoord, err = isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("QueryNode new IndexCoordClient failed", zap.Error(err))
|
log.Debug("QueryNode new IndexCoordClient failed", zap.Error(err))
|
||||||
panic(err)
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := indexCoord.Init(); err != nil {
|
if err := s.indexCoord.Init(); err != nil {
|
||||||
log.Debug("QueryNode IndexCoordClient Init failed", zap.Error(err))
|
log.Debug("QueryNode IndexCoordClient Init failed", zap.Error(err))
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := indexCoord.Start(); err != nil {
|
if err := s.indexCoord.Start(); err != nil {
|
||||||
log.Debug("QueryNode IndexCoordClient Start failed", zap.Error(err))
|
log.Debug("QueryNode IndexCoordClient Start failed", zap.Error(err))
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
// wait IndexCoord healthy
|
// wait IndexCoord healthy
|
||||||
log.Debug("QueryNode start to wait for IndexCoord ready")
|
log.Debug("QueryNode start to wait for IndexCoord ready")
|
||||||
err = funcutil.WaitForComponentHealthy(s.ctx, indexCoord, "IndexCoord", 1000000, time.Millisecond*200)
|
err = funcutil.WaitForComponentHealthy(s.ctx, s.indexCoord, "IndexCoord", 1000000, time.Millisecond*200)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("QueryNode wait for IndexCoord ready failed", zap.Error(err))
|
log.Debug("QueryNode wait for IndexCoord ready failed", zap.Error(err))
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
log.Debug("QueryNode report IndexCoord is ready")
|
log.Debug("QueryNode report IndexCoord is ready")
|
||||||
|
|
||||||
if err := s.SetIndexCoord(indexCoord); err != nil {
|
if err := s.SetIndexCoord(s.indexCoord); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,6 +259,12 @@ func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) error {
|
|||||||
return s.querynode.SetIndexCoord(indexCoord)
|
return s.querynode.SetIndexCoord(indexCoord)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetClient sets the IndexNode's instance.
|
||||||
|
func (s *Server) SetClient(queryNodeClient qn.Base) error {
|
||||||
|
s.querynode = queryNodeClient
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
|
func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
|
||||||
return s.querynode.GetTimeTickChannel(ctx)
|
return s.querynode.GetTimeTickChannel(ctx)
|
||||||
}
|
}
|
||||||
|
319
internal/distributed/querynode/service_test.go
Normal file
319
internal/distributed/querynode/service_test.go
Normal file
@ -0,0 +1,319 @@
|
|||||||
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||||
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
package grpcquerynode
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
|
||||||
|
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
type MockQueryNode struct {
|
||||||
|
states *internalpb.ComponentStates
|
||||||
|
status *commonpb.Status
|
||||||
|
err error
|
||||||
|
strResp *milvuspb.StringResponse
|
||||||
|
infoResp *querypb.GetSegmentInfoResponse
|
||||||
|
metricResp *milvuspb.GetMetricsResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) Init() error {
|
||||||
|
return m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) Start() error {
|
||||||
|
return m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) Stop() error {
|
||||||
|
return m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) Register() error {
|
||||||
|
return m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||||
|
return m.states, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||||
|
return m.strResp, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||||
|
return m.strResp, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) AddQueryChannel(ctx context.Context, req *querypb.AddQueryChannelRequest) (*commonpb.Status, error) {
|
||||||
|
return m.status, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) RemoveQueryChannel(ctx context.Context, req *querypb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
|
||||||
|
return m.status, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
|
||||||
|
return m.status, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
|
||||||
|
return m.status, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||||
|
return m.status, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
|
||||||
|
return m.status, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
|
||||||
|
return m.status, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
|
||||||
|
return m.infoResp, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||||
|
return m.metricResp, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) UpdateStateCode(code internalpb.StateCode) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) SetRootCoord(rc types.RootCoord) error {
|
||||||
|
return m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockQueryNode) SetIndexCoord(index types.IndexCoord) error {
|
||||||
|
return m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
type MockRootCoord struct {
|
||||||
|
rcc.Base
|
||||||
|
initErr error
|
||||||
|
startErr error
|
||||||
|
regErr error
|
||||||
|
stopErr error
|
||||||
|
stateErr commonpb.ErrorCode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockRootCoord) Init() error {
|
||||||
|
return m.initErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockRootCoord) Start() error {
|
||||||
|
return m.startErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockRootCoord) Stop() error {
|
||||||
|
return m.stopErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockRootCoord) Register() error {
|
||||||
|
return m.regErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockRootCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||||
|
return &internalpb.ComponentStates{
|
||||||
|
State: &internalpb.ComponentInfo{StateCode: internalpb.StateCode_Healthy},
|
||||||
|
Status: &commonpb.Status{ErrorCode: m.stateErr},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
type MockIndexCoord struct {
|
||||||
|
isc.Base
|
||||||
|
initErr error
|
||||||
|
startErr error
|
||||||
|
regErr error
|
||||||
|
stopErr error
|
||||||
|
stateErr commonpb.ErrorCode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockIndexCoord) Init() error {
|
||||||
|
return m.initErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockIndexCoord) Start() error {
|
||||||
|
return m.startErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockIndexCoord) Stop() error {
|
||||||
|
return m.stopErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockIndexCoord) Register() error {
|
||||||
|
return m.regErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockIndexCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||||
|
return &internalpb.ComponentStates{
|
||||||
|
State: &internalpb.ComponentInfo{StateCode: internalpb.StateCode_Healthy},
|
||||||
|
Status: &commonpb.Status{ErrorCode: m.stateErr},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
func Test_NewServer(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
qns, err := NewServer(ctx, nil)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.NotNil(t, qns)
|
||||||
|
|
||||||
|
mqn := &MockQueryNode{
|
||||||
|
states: &internalpb.ComponentStates{State: &internalpb.ComponentInfo{StateCode: internalpb.StateCode_Healthy}},
|
||||||
|
status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||||
|
err: nil,
|
||||||
|
strResp: &milvuspb.StringResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}},
|
||||||
|
infoResp: &querypb.GetSegmentInfoResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}},
|
||||||
|
metricResp: &milvuspb.GetMetricsResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}},
|
||||||
|
}
|
||||||
|
err = qns.SetClient(mqn)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
t.Run("Run", func(t *testing.T) {
|
||||||
|
qns.rootCoord = &MockRootCoord{}
|
||||||
|
qns.indexCoord = &MockIndexCoord{}
|
||||||
|
|
||||||
|
err = qns.Run()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GetComponentStates", func(t *testing.T) {
|
||||||
|
req := &internalpb.GetComponentStatesRequest{}
|
||||||
|
states, err := qns.GetComponentStates(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GetStatisticsChannel", func(t *testing.T) {
|
||||||
|
req := &internalpb.GetStatisticsChannelRequest{}
|
||||||
|
resp, err := qns.GetStatisticsChannel(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GetTimeTickChannel", func(t *testing.T) {
|
||||||
|
req := &internalpb.GetTimeTickChannelRequest{}
|
||||||
|
resp, err := qns.GetTimeTickChannel(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("AddQueryChannel", func(t *testing.T) {
|
||||||
|
req := &querypb.AddQueryChannelRequest{}
|
||||||
|
resp, err := qns.AddQueryChannel(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("RemoveQueryChannel", func(t *testing.T) {
|
||||||
|
req := &querypb.RemoveQueryChannelRequest{}
|
||||||
|
resp, err := qns.RemoveQueryChannel(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("WatchDmChannels", func(t *testing.T) {
|
||||||
|
req := &querypb.WatchDmChannelsRequest{}
|
||||||
|
resp, err := qns.WatchDmChannels(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("LoadSegments", func(t *testing.T) {
|
||||||
|
req := &querypb.LoadSegmentsRequest{}
|
||||||
|
resp, err := qns.LoadSegments(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ReleaseCollection", func(t *testing.T) {
|
||||||
|
req := &querypb.ReleaseCollectionRequest{}
|
||||||
|
resp, err := qns.ReleaseCollection(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ReleasePartitions", func(t *testing.T) {
|
||||||
|
req := &querypb.ReleasePartitionsRequest{}
|
||||||
|
resp, err := qns.ReleasePartitions(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ReleaseSegments", func(t *testing.T) {
|
||||||
|
req := &querypb.ReleaseSegmentsRequest{}
|
||||||
|
resp, err := qns.ReleaseSegments(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GetSegmentInfo", func(t *testing.T) {
|
||||||
|
req := &querypb.GetSegmentInfoRequest{}
|
||||||
|
resp, err := qns.GetSegmentInfo(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GetMetrics", func(t *testing.T) {
|
||||||
|
req := &milvuspb.GetMetricsRequest{
|
||||||
|
Request: "",
|
||||||
|
}
|
||||||
|
resp, err := qns.GetMetrics(ctx, req)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
err = qns.Stop()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_Run(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
qns, err := NewServer(ctx, nil)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.NotNil(t, qns)
|
||||||
|
|
||||||
|
qns.rootCoord = &MockRootCoord{initErr: errors.New("Failed")}
|
||||||
|
qns.indexCoord = &MockIndexCoord{}
|
||||||
|
assert.Panics(t, func() { err = qns.Run() })
|
||||||
|
|
||||||
|
qns.rootCoord = &MockRootCoord{startErr: errors.New("Failed")}
|
||||||
|
qns.indexCoord = &MockIndexCoord{}
|
||||||
|
assert.Panics(t, func() { err = qns.Run() })
|
||||||
|
|
||||||
|
qns.rootCoord = &MockRootCoord{}
|
||||||
|
qns.indexCoord = &MockIndexCoord{initErr: errors.New("Failed")}
|
||||||
|
assert.Panics(t, func() { err = qns.Run() })
|
||||||
|
|
||||||
|
qns.rootCoord = &MockRootCoord{}
|
||||||
|
qns.indexCoord = &MockIndexCoord{startErr: errors.New("Failed")}
|
||||||
|
assert.Panics(t, func() { err = qns.Run() })
|
||||||
|
}
|
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
"github.com/milvus-io/milvus/internal/util/retry"
|
"github.com/milvus-io/milvus/internal/util/retry"
|
||||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/trace"
|
"github.com/milvus-io/milvus/internal/util/trace"
|
||||||
@ -36,6 +37,15 @@ import (
|
|||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Base interface {
|
||||||
|
types.RootCoord
|
||||||
|
|
||||||
|
Init() error
|
||||||
|
Start() error
|
||||||
|
Stop() error
|
||||||
|
Register() error
|
||||||
|
}
|
||||||
|
|
||||||
// GrpcClient grpc client
|
// GrpcClient grpc client
|
||||||
type GrpcClient struct {
|
type GrpcClient struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
@ -45,6 +45,14 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Base interface {
|
||||||
|
types.QueryNode
|
||||||
|
|
||||||
|
UpdateStateCode(code internalpb.StateCode)
|
||||||
|
SetRootCoord(rc types.RootCoord) error
|
||||||
|
SetIndexCoord(index types.IndexCoord) error
|
||||||
|
}
|
||||||
|
|
||||||
// QueryNode communicates with outside services and union all
|
// QueryNode communicates with outside services and union all
|
||||||
// services in querynode package.
|
// services in querynode package.
|
||||||
//
|
//
|
||||||
|
Loading…
Reference in New Issue
Block a user