From 70fe5233cf50125f9d0894c9752bee8632994d10 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Tue, 9 Feb 2021 17:09:26 +0800 Subject: [PATCH] Watch queryChannel on query node Signed-off-by: xige-16 --- cmd/distributed/components/query_node.go | 193 ++++---- cmd/distributed/components/query_service.go | 147 +++--- internal/distributed/querynode/mock.go | 175 +++++++ .../distributed/querynode/service_test.go | 88 ++++ .../queryservice/client/binlog_test.go | 345 ++++++++++++++ .../queryservice/client/client_test.go | 261 ++++++++++ .../msgstream/pulsarms/pulsar_msgstream.go | 85 ++-- internal/msgstream/util/repack_func.go | 3 + internal/querynode/query_node.go | 26 +- internal/querynode/query_node_test.go | 22 +- internal/querynode/search_service.go | 19 +- internal/queryservice/load_test.go | 443 ----------------- internal/queryservice/mock.go | 152 ++++++ internal/queryservice/param_table.go | 450 +----------------- internal/queryservice/querynode.go | 4 + internal/queryservice/queryservice.go | 20 +- internal/queryservice/queryservice_test.go | 158 +----- 17 files changed, 1333 insertions(+), 1258 deletions(-) create mode 100644 internal/distributed/querynode/mock.go create mode 100644 internal/distributed/querynode/service_test.go create mode 100644 internal/distributed/queryservice/client/binlog_test.go create mode 100644 internal/distributed/queryservice/client/client_test.go delete mode 100644 internal/queryservice/load_test.go create mode 100644 internal/queryservice/mock.go diff --git a/cmd/distributed/components/query_node.go b/cmd/distributed/components/query_node.go index 621c3a5f0b..06c7ebdc9a 100644 --- a/cmd/distributed/components/query_node.go +++ b/cmd/distributed/components/query_node.go @@ -2,6 +2,7 @@ package components import ( "context" + "errors" "fmt" "log" "time" @@ -18,7 +19,6 @@ import ( ms "github.com/zilliztech/milvus-distributed/internal/masterservice" qs "github.com/zilliztech/milvus-distributed/internal/queryservice" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) @@ -91,119 +91,134 @@ func NewQueryNode(ctx context.Context, factory msgstream.Factory) (*QueryNode, e addr := fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port) log.Println("Master service address:", addr) log.Println("Init master service client ...") - masterService, err := msc.NewGrpcClient(addr, 20*time.Second) - if err != nil { - panic(err) - } + var masterService *msc.GrpcClient = nil + if QueryMock { + svr.SetMasterService(&qns.MasterServiceMock{Count: 0}) + } else { + masterService, err = msc.NewGrpcClient(addr, 20*time.Second) + if err != nil { + panic(err) + } - if err = masterService.Init(); err != nil { - panic(err) - } + if err = masterService.Init(); err != nil { + panic(err) + } - if err = masterService.Start(); err != nil { - panic(err) - } + if err = masterService.Start(); err != nil { + panic(err) + } - ticker := time.NewTicker(interval * time.Millisecond) - tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond) - defer func() { - ticker.Stop() - tcancel() - }() + ticker := time.NewTicker(interval * time.Millisecond) + tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond) + defer func() { + ticker.Stop() + tcancel() + }() - for { - var states *internalpb2.ComponentStates - select { - case <-ticker.C: - states, err = masterService.GetComponentStates() - if err != nil { - continue + for { + var states *internalpb2.ComponentStates + select { + case <-ticker.C: + states, err = masterService.GetComponentStates() + if err != nil { + continue + } + case <-tctx.Done(): + return nil, errors.New("master client connect timeout") + } + if states.State.StateCode == internalpb2.StateCode_HEALTHY { + break } - case <-tctx.Done(): - return nil, errors.New("master client connect timeout") } - if states.State.StateCode == internalpb2.StateCode_HEALTHY { - break - } - } - if err := svr.SetMasterService(masterService); err != nil { - panic(err) + if err := svr.SetMasterService(masterService); err != nil { + panic(err) + } } // --- IndexService --- is.Params.Init() log.Println("Index service address:", is.Params.Address) - indexService := isc.NewClient(is.Params.Address) + var indexService *isc.Client = nil + if QueryMock { + svr.SetIndexService(&qns.IndexServiceMock{Count: 0}) + } else { + indexService = isc.NewClient(is.Params.Address) - if err := indexService.Init(); err != nil { - panic(err) - } + if err := indexService.Init(); err != nil { + panic(err) + } - if err := indexService.Start(); err != nil { - panic(err) - } + if err := indexService.Start(); err != nil { + panic(err) + } - ticker = time.NewTicker(interval * time.Millisecond) - tctx, tcancel = context.WithTimeout(ctx, 10*interval*time.Millisecond) - defer func() { - ticker.Stop() - tcancel() - }() + ticker := time.NewTicker(interval * time.Millisecond) + tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond) + defer func() { + ticker.Stop() + tcancel() + }() - for { - var states *internalpb2.ComponentStates - select { - case <-ticker.C: - states, err = indexService.GetComponentStates() - if err != nil { - continue + for { + var states *internalpb2.ComponentStates + select { + case <-ticker.C: + states, err = indexService.GetComponentStates() + if err != nil { + continue + } + case <-tctx.Done(): + return nil, errors.New("Index service client connect timeout") + } + if states.State.StateCode == internalpb2.StateCode_HEALTHY { + break } - case <-tctx.Done(): - return nil, errors.New("Index service client connect timeout") } - if states.State.StateCode == internalpb2.StateCode_HEALTHY { - break - } - } - if err := svr.SetIndexService(indexService); err != nil { - panic(err) + if err := svr.SetIndexService(indexService); err != nil { + panic(err) + } } // --- DataService --- ds.Params.Init() log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port) log.Println("Init data service client ...") - dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) - if err = dataService.Init(); err != nil { - panic(err) - } - if err = dataService.Start(); err != nil { - panic(err) - } + var dataService *dsc.Client = nil + if QueryMock { + svr.SetDataService(&qns.DataServiceMock{Count: 0}) + } else { + dataService = dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } - for cnt = 0; cnt < retry; cnt++ { - dsStates, err := dataService.GetComponentStates() - if err != nil { - log.Printf("retry cout = %d, error = %s", cnt, err.Error()) - continue + for cnt = 0; cnt < retry; cnt++ { + dsStates, err := dataService.GetComponentStates() + if err != nil { + log.Printf("retry cout = %d, error = %s", cnt, err.Error()) + continue + } + if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + log.Printf("retry cout = %d, error = %s", cnt, err.Error()) + continue + } + if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("retry cout = %d, error = %s", cnt, err.Error()) - continue + if cnt >= retry { + panic("Data service isn't ready") } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= retry { - panic("Data service isn't ready") - } - if err := svr.SetDataService(dataService); err != nil { - panic(err) + if err := svr.SetDataService(dataService); err != nil { + panic(err) + } } return &QueryNode{ @@ -231,9 +246,11 @@ func (q *QueryNode) Run() error { } func (q *QueryNode) Stop() error { - _ = q.dataService.Stop() - _ = q.masterService.Stop() + if !QueryMock { + _ = q.dataService.Stop() + _ = q.masterService.Stop() + _ = q.indexService.Stop() + } _ = q.queryService.Stop() - _ = q.indexService.Stop() return q.svr.Stop() } diff --git a/cmd/distributed/components/query_service.go b/cmd/distributed/components/query_service.go index ca288ffb77..4d6c937dec 100644 --- a/cmd/distributed/components/query_service.go +++ b/cmd/distributed/components/query_service.go @@ -6,12 +6,12 @@ import ( "log" "time" + ds "github.com/zilliztech/milvus-distributed/internal/dataservice" dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" qs "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice" - - ds "github.com/zilliztech/milvus-distributed/internal/dataservice" ms "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -26,6 +26,10 @@ type QueryService struct { masterService *msc.GrpcClient } +const ( + QueryMock = false +) + func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) { const retry = 10 const interval = 200 @@ -41,79 +45,92 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ ms.Params.Init() log.Printf("Master service address: %s:%d", ms.Params.Address, ms.Params.Port) log.Println("Init master service client ...") - masterService, err := msc.NewGrpcClient(fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port), 20*time.Second) - if err != nil { - panic(err) - } - - if err = masterService.Init(); err != nil { - panic(err) - } - - if err = masterService.Start(); err != nil { - panic(err) - } - - var cnt int - for cnt = 0; cnt < retry; cnt++ { - time.Sleep(time.Duration(cnt*interval) * time.Millisecond) - if cnt != 0 { - log.Println("Master service isn't ready ...") - log.Printf("Retrying getting master service's states in ... %v ms", interval) - } - - msStates, err := masterService.GetComponentStates() - + var masterService *msc.GrpcClient = nil + if QueryMock { + masterMock := queryservice.NewMasterMock() + svr.SetMasterService(masterMock) + } else { + masterService, err = msc.NewGrpcClient(fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port), 20*time.Second) if err != nil { - continue + panic(err) } - if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue - } - if msStates.State.StateCode != internalpb2.StateCode_HEALTHY && msStates.State.StateCode != internalpb2.StateCode_INITIALIZING { - continue - } - break - } - if cnt >= retry { - panic("Master service isn't ready") - } - if err := svr.SetMasterService(masterService); err != nil { - panic(err) + if err = masterService.Init(); err != nil { + panic(err) + } + + if err = masterService.Start(); err != nil { + panic(err) + } + + var cnt int + for cnt = 0; cnt < retry; cnt++ { + time.Sleep(time.Duration(cnt*interval) * time.Millisecond) + if cnt != 0 { + log.Println("Master service isn't ready ...") + log.Printf("Retrying getting master service's states in ... %v ms", interval) + } + + msStates, err := masterService.GetComponentStates() + + if err != nil { + continue + } + if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if msStates.State.StateCode != internalpb2.StateCode_HEALTHY && msStates.State.StateCode != internalpb2.StateCode_INITIALIZING { + continue + } + break + } + if cnt >= retry { + panic("Master service isn't ready") + } + + if err := svr.SetMasterService(masterService); err != nil { + panic(err) + } } // --- Data service client --- ds.Params.Init() log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port) log.Println("Init data service client ...") - dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) - if err = dataService.Init(); err != nil { - panic(err) - } - if err = dataService.Start(); err != nil { - panic(err) - } + var dataService *dsc.Client = nil + if QueryMock { + dataMock := queryservice.NewDataMock() + svr.SetDataService(dataMock) + } else { + dataService = dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } - for cnt = 0; cnt < retry; cnt++ { - dsStates, err := dataService.GetComponentStates() - if err != nil { - continue + var cnt int + for cnt = 0; cnt < retry; cnt++ { + dsStates, err := dataService.GetComponentStates() + if err != nil { + continue + } + if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue + if cnt >= retry { + panic("Data service isn't ready") } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= retry { - panic("Data service isn't ready") - } - if err := svr.SetDataService(dataService); err != nil { - panic(err) + if err := svr.SetDataService(dataService); err != nil { + panic(err) + } } return &QueryService{ @@ -137,7 +154,9 @@ func (qs *QueryService) Run() error { } func (qs *QueryService) Stop() error { - _ = qs.dataService.Stop() - _ = qs.masterService.Stop() + if !QueryMock { + _ = qs.dataService.Stop() + _ = qs.masterService.Stop() + } return qs.svr.Stop() } diff --git a/internal/distributed/querynode/mock.go b/internal/distributed/querynode/mock.go new file mode 100644 index 0000000000..28ebe80168 --- /dev/null +++ b/internal/distributed/querynode/mock.go @@ -0,0 +1,175 @@ +package grpcquerynode + +import ( + "path" + "strconv" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" +) + +const ( + collectionID = 1 + + binlogPathPrefix = "distributed-query-test-binlog" + indexPathPrefix = "distributed-query-test-index" + + uidFieldID = 0 + timestampFieldID = 1 + vecFieldID = 100 + ageFieldID = 101 + vecParamsID = "indexParams" + vecDataID = "IVF" +) + +var fieldIDs = []int64{uidFieldID, timestampFieldID, vecFieldID, ageFieldID} + +/* + masterMock receive segmentID ,return indexID, segmentID = IndexID + dataMock return binlogPath, path = distributed-query-test-binlog/collectionID/segmentID/fieldID + indexMock return indexPath and IndexParam, indexPath = distributed-query-test-index/collectionID/segmentID/indexID, + indexParam use default: + +indexID: 1 + +schema: + collectionID: 1 + partitionID: 1 + segmentID: [1, 10] + 0: int64: uid + 1: int64: timestamp + 100: float32: vec: 16 + 101: int32: age + +indexParams: + indexParams := make(map[string]string) + indexParams["index_type"] = "IVF_PQ" + indexParams["index_mode"] = "cpu" + indexParams["dim"] = "16" + indexParams["k"] = "10" + indexParams["nlist"] = "100" + indexParams["nprobe"] = "10" + indexParams["m"] = "4" + indexParams["nbits"] = "8" + indexParams["metric_type"] = "L2" + indexParams["SLICE_SIZE"] = "4" +*/ + +type MasterServiceMock struct { + Count int +} + +func (m *MasterServiceMock) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { + if m.Count < 20 { + m.Count++ + return nil, errors.New("index not exit") + } + indexParams := make(map[string]string) + indexParams["index_type"] = "IVF_PQ" + indexParams["index_mode"] = "cpu" + indexParams["dim"] = "16" + indexParams["k"] = "10" + indexParams["nlist"] = "100" + indexParams["nprobe"] = "10" + indexParams["m"] = "4" + indexParams["nbits"] = "8" + indexParams["metric_type"] = "L2" + indexParams["SLICE_SIZE"] = "4" + + params := make([]*commonpb.KeyValuePair, 0) + for k, v := range indexParams { + params = append(params, &commonpb.KeyValuePair{ + Key: k, + Value: v, + }) + } + rsp := &milvuspb.DescribeSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + IndexID: in.SegmentID, // use index id as segment id + BuildID: in.SegmentID, + } + return rsp, nil +} + +type DataServiceMock struct { + Count int +} + +func (data *DataServiceMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) { + if data.Count < 10 { + data.Count++ + return nil, errors.New("binlog not exist") + } + paths := make([]*internalPb.StringList, len(fieldIDs)) + for i := range paths { + pathKey := path.Join(binlogPathPrefix, + strconv.FormatInt(collectionID, 10), + strconv.FormatInt(req.SegmentID, 10), + strconv.FormatInt(fieldIDs[i], 10)) + paths[i] = &internalPb.StringList{ + Values: []string{pathKey}, + } + } + rsp := &datapb.InsertBinlogPathsResponse{ + FieldIDs: fieldIDs, + Paths: paths, + } + return rsp, nil +} + +type IndexServiceMock struct { + Count int +} + +func (index *IndexServiceMock) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) { + if index.Count < 30 { + index.Count++ + return nil, errors.New("index path not exist") + } + if len(req.IndexBuildIDs) != 1 { + panic("illegal index ids") + } + segmentID := req.IndexBuildIDs[0] // use index id as segment id + indexPaths1 := path.Join(indexPathPrefix, + strconv.FormatInt(collectionID, 10), + strconv.FormatInt(segmentID, 10), + vecDataID) + indexPaths2 := path.Join(indexPathPrefix, + strconv.FormatInt(collectionID, 10), + strconv.FormatInt(segmentID, 10), + vecParamsID) + indexPathInfo := make([]*indexpb.IndexFilePathInfo, 1) + indexPathInfo[0] = &indexpb.IndexFilePathInfo{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + IndexFilePaths: []string{indexPaths1, indexPaths2}, + } + rsp := &indexpb.IndexFilePathsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + FilePaths: indexPathInfo, + } + return rsp, nil +} + +type queryServiceMock struct{} + +func (q *queryServiceMock) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { + return &querypb.RegisterNodeResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + InitParams: &internalPb.InitParams{ + NodeID: int64(0), + }, + }, nil +} diff --git a/internal/distributed/querynode/service_test.go b/internal/distributed/querynode/service_test.go new file mode 100644 index 0000000000..b7daa509bb --- /dev/null +++ b/internal/distributed/querynode/service_test.go @@ -0,0 +1,88 @@ +package grpcquerynode + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "testing" + "time" + + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" + "github.com/zilliztech/milvus-distributed/internal/querynode" +) + +const ( + debug = true + ctxTimeInMillisecond = 2000 +) + +func TestQueryNodeDistributed_Service(t *testing.T) { + // Creates server. + var ctx context.Context + var cancel context.CancelFunc + if debug { + ctx, cancel = context.WithCancel(context.Background()) + } else { + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + } + go mockMain(ctx) + <-ctx.Done() + cancel() +} + +func mockMain(ctx context.Context) { + svr := newServerMock(ctx) + if err := svr.Init(); err != nil { + panic(err) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + if err := svr.Start(); err != nil { + panic(err) + } + defer svr.Stop() + + <-ctx.Done() + log.Print("Got signal to exit", zap.String("signal", sig.String())) + + switch sig { + case syscall.SIGTERM: + os.Exit(0) + default: + os.Exit(1) + } +} + +func newServerMock(ctx context.Context) *Server { + factory := pulsarms.NewFactory() + server := &Server{ + node: querynode.NewQueryNodeWithoutID(ctx, factory), + } + + if err := server.node.SetQueryService(&queryServiceMock{}); err != nil { + panic(err) + } + if err := server.node.SetMasterService(&MasterServiceMock{}); err != nil { + panic(err) + } + if err := server.node.SetIndexService(&IndexServiceMock{}); err != nil { + panic(err) + } + if err := server.node.SetDataService(&DataServiceMock{}); err != nil { + panic(err) + } + + return server +} diff --git a/internal/distributed/queryservice/client/binlog_test.go b/internal/distributed/queryservice/client/binlog_test.go new file mode 100644 index 0000000000..0e532d9daa --- /dev/null +++ b/internal/distributed/queryservice/client/binlog_test.go @@ -0,0 +1,345 @@ +package grpcqueryserviceclient + +//import ( +// "context" +// "encoding/binary" +// "math" +// "path" +// "strconv" +// "testing" +// "time" +// +// "github.com/stretchr/testify/assert" +// +// "github.com/zilliztech/milvus-distributed/internal/indexnode" +// minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" +// "github.com/zilliztech/milvus-distributed/internal/msgstream" +// "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" +// "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" +// "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" +// "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +// "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" +// "github.com/zilliztech/milvus-distributed/internal/storage" +// "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +//) +// +////generate insert data +//const msgLength = 100 +//const receiveBufSize = 1024 +//const pulsarBufSize = 1024 +//const DIM = 16 +// +//type UniqueID = typeutil.UniqueID +// +//func genInsert(collectionID int64, +// partitionID int64, +// timeStart int, +// numDmChannels int, +// binlog bool) (*msgstream.MsgPack, *msgstream.MsgPack) { +// msgs := make([]msgstream.TsMsg, 0) +// for n := timeStart; n < timeStart+msgLength; n++ { +// rowData := make([]byte, 0) +// if binlog { +// id := make([]byte, 8) +// binary.BigEndian.PutUint64(id, uint64(n)) +// rowData = append(rowData, id...) +// time := make([]byte, 8) +// binary.BigEndian.PutUint64(time, uint64(n)) +// rowData = append(rowData, time...) +// } +// for i := 0; i < DIM; i++ { +// vec := make([]byte, 4) +// binary.BigEndian.PutUint32(vec, math.Float32bits(float32(n*i))) +// rowData = append(rowData, vec...) +// } +// age := make([]byte, 4) +// binary.BigEndian.PutUint32(age, 1) +// rowData = append(rowData, age...) +// blob := &commonpb.Blob{ +// Value: rowData, +// } +// +// var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{ +// BaseMsg: msgstream.BaseMsg{ +// HashValues: []uint32{uint32((n - 1) % numDmChannels)}, +// }, +// InsertRequest: internalpb2.InsertRequest{ +// Base: &commonpb.MsgBase{ +// MsgType: commonpb.MsgType_kInsert, +// MsgID: 0, +// Timestamp: uint64(n), +// SourceID: 0, +// }, +// CollectionID: collectionID, +// PartitionID: partitionID, +// SegmentID: UniqueID(((n - 1) % numDmChannels) + ((n-1)/(numDmChannels*msgLength))*numDmChannels), +// ChannelID: "0", +// Timestamps: []uint64{uint64(n)}, +// RowIDs: []int64{int64(n)}, +// RowData: []*commonpb.Blob{blob}, +// }, +// } +// //fmt.Println("hash value = ", insertMsg.(*msgstream.InsertMsg).HashValues, "segmentID = ", insertMsg.(*msgstream.InsertMsg).SegmentID) +// msgs = append(msgs, insertMsg) +// } +// +// insertMsgPack := &msgstream.MsgPack{ +// BeginTs: uint64(timeStart), +// EndTs: uint64(timeStart + msgLength), +// Msgs: msgs, +// } +// +// // generate timeTick +// timeTickMsg := &msgstream.TimeTickMsg{ +// BaseMsg: msgstream.BaseMsg{ +// BeginTimestamp: 0, +// EndTimestamp: 0, +// HashValues: []uint32{0}, +// }, +// TimeTickMsg: internalpb2.TimeTickMsg{ +// Base: &commonpb.MsgBase{ +// MsgType: commonpb.MsgType_kTimeTick, +// MsgID: 0, +// Timestamp: uint64(timeStart + msgLength), +// SourceID: 0, +// }, +// }, +// } +// timeTickMsgPack := &msgstream.MsgPack{ +// Msgs: []msgstream.TsMsg{timeTickMsg}, +// } +// return insertMsgPack, timeTickMsgPack +//} +// +//func genSchema(collectionID int64) *schemapb.CollectionSchema { +// fieldID := schemapb.FieldSchema{ +// FieldID: UniqueID(0), +// Name: "RowID", +// IsPrimaryKey: false, +// DataType: schemapb.DataType_INT64, +// } +// +// fieldTime := schemapb.FieldSchema{ +// FieldID: UniqueID(1), +// Name: "Timestamp", +// IsPrimaryKey: false, +// DataType: schemapb.DataType_INT64, +// } +// +// fieldVec := schemapb.FieldSchema{ +// FieldID: UniqueID(100), +// Name: "vec", +// IsPrimaryKey: false, +// DataType: schemapb.DataType_VECTOR_FLOAT, +// TypeParams: []*commonpb.KeyValuePair{ +// { +// Key: "dim", +// Value: "16", +// }, +// }, +// IndexParams: []*commonpb.KeyValuePair{ +// { +// Key: "metric_type", +// Value: "L2", +// }, +// }, +// } +// +// fieldInt := schemapb.FieldSchema{ +// FieldID: UniqueID(101), +// Name: "age", +// IsPrimaryKey: false, +// DataType: schemapb.DataType_INT32, +// } +// +// return &schemapb.CollectionSchema{ +// Name: "collection-" + strconv.FormatInt(collectionID, 10), +// AutoID: true, +// Fields: []*schemapb.FieldSchema{ +// &fieldID, &fieldTime, &fieldVec, &fieldInt, +// }, +// } +//} +// +//func getMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) { +// minioAddress := "localhost:9000" +// accessKeyID := "minioadmin" +// secretAccessKey := "minioadmin" +// useSSL := false +// bucketName := "a-bucket" +// +// option := &minioKV.Option{ +// Address: minioAddress, +// AccessKeyID: accessKeyID, +// SecretAccessKeyID: secretAccessKey, +// UseSSL: useSSL, +// BucketName: bucketName, +// CreateBucket: true, +// } +// +// return minioKV.NewMinIOKV(ctx, option) +//} +// +//func TestWriteBinLog(t *testing.T) { +// const ( +// debug = true +// consumeSubName = "test-load-collection-sub-name" +// ) +// var ctx context.Context +// if debug { +// ctx = context.Background() +// } else { +// var cancel context.CancelFunc +// ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) +// defer cancel() +// } +// +// // produce msg +// insertChannels := []string{"insert-0", "insert-1", "insert-2", "insert-3"} +// pulsarAddress := "pulsar://127.0.0.1:6650" +// +// factory := pulsarms.NewFactory(pulsarAddress, receiveBufSize, pulsarBufSize) +// +// insertStream, _ := factory.NewTtMsgStream(ctx) +// insertStream.AsProducer(insertChannels) +// insertStream.AsConsumer(insertChannels, consumeSubName) +// insertStream.Start() +// +// for i := 0; i < 12; i++ { +// insertMsgPack, timeTickMsgPack := genInsert(1, 1, i*msgLength+1, 4, true) +// err := insertStream.Produce(insertMsgPack) +// assert.NoError(t, err) +// err = insertStream.Broadcast(timeTickMsgPack) +// assert.NoError(t, err) +// } +// +// //consume msg +// segmentData := make([]*storage.InsertData, 12) +// idData := make([][]int64, 12) +// timestamps := make([][]int64, 12) +// fieldAgeData := make([][]int32, 12) +// fieldVecData := make([][]float32, 12) +// for i := 0; i < 12; i++ { +// idData[i] = make([]int64, 0) +// timestamps[i] = make([]int64, 0) +// fieldAgeData[i] = make([]int32, 0) +// fieldVecData[i] = make([]float32, 0) +// } +// for i := 0; i < 12; i++ { +// msgPack := insertStream.Consume() +// +// for n := 0; n < msgLength; n++ { +// segmentID := msgPack.Msgs[n].(*msgstream.InsertMsg).SegmentID +// blob := msgPack.Msgs[n].(*msgstream.InsertMsg).RowData[0].Value +// id := binary.BigEndian.Uint64(blob[0:8]) +// idData[segmentID] = append(idData[segmentID], int64(id)) +// t := binary.BigEndian.Uint64(blob[8:16]) +// timestamps[segmentID] = append(timestamps[segmentID], int64(t)) +// for i := 0; i < DIM; i++ { +// bits := binary.BigEndian.Uint32(blob[16+4*i : 16+4*(i+1)]) +// floatVec := math.Float32frombits(bits) +// fieldVecData[segmentID] = append(fieldVecData[segmentID], floatVec) +// } +// ageValue := binary.BigEndian.Uint32(blob[80:84]) +// fieldAgeData[segmentID] = append(fieldAgeData[segmentID], int32(ageValue)) +// } +// } +// for i := 0; i < 12; i++ { +// insertData := &storage.InsertData{ +// Data: map[int64]storage.FieldData{ +// 0: &storage.Int64FieldData{ +// NumRows: msgLength, +// Data: idData[i], +// }, +// 1: &storage.Int64FieldData{ +// NumRows: msgLength, +// Data: timestamps[i], +// }, +// 100: &storage.FloatVectorFieldData{ +// NumRows: msgLength, +// Data: fieldVecData[i], +// Dim: DIM, +// }, +// 101: &storage.Int32FieldData{ +// NumRows: msgLength, +// Data: fieldAgeData[i], +// }, +// }, +// } +// segmentData[i] = insertData +// } +// +// //gen inCodec +// collectionMeta := &etcdpb.CollectionMeta{ +// ID: 1, +// Schema: genSchema(1), +// CreateTime: 0, +// PartitionIDs: []int64{1}, +// SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, +// } +// inCodec := storage.NewInsertCodec(collectionMeta) +// indexCodec := storage.NewIndexCodec() +// +// // get minio client +// kv, err := getMinioKV(context.Background()) +// assert.Nil(t, err) +// +// // write binlog minio +// collectionStr := strconv.FormatInt(1, 10) +// for i := 0; i < 12; i++ { +// binLogs, err := inCodec.Serialize(1, storage.UniqueID(i), segmentData[i]) +// assert.Nil(t, err) +// assert.Equal(t, len(binLogs), 4) +// keyPrefix := "distributed-query-test-binlog" +// segmentStr := strconv.FormatInt(int64(i), 10) +// +// for _, blob := range binLogs { +// key := path.Join(keyPrefix, collectionStr, segmentStr, blob.Key) +// err = kv.Save(key, string(blob.Value[:])) +// assert.Nil(t, err) +// } +// } +// +// // gen index build's indexParams +// indexParams := make(map[string]string) +// indexParams["index_type"] = "IVF_PQ" +// indexParams["index_mode"] = "cpu" +// indexParams["dim"] = "16" +// indexParams["k"] = "10" +// indexParams["nlist"] = "100" +// indexParams["nprobe"] = "10" +// indexParams["m"] = "4" +// indexParams["nbits"] = "8" +// indexParams["metric_type"] = "L2" +// indexParams["SLICE_SIZE"] = "400" +// +// var indexParamsKV []*commonpb.KeyValuePair +// for key, value := range indexParams { +// indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ +// Key: key, +// Value: value, +// }) +// } +// +// // generator index and write index to minio +// for i := 0; i < 12; i++ { +// typeParams := make(map[string]string) +// typeParams["dim"] = "16" +// index, err := indexnode.NewCIndex(typeParams, indexParams) +// assert.Nil(t, err) +// err = index.BuildFloatVecIndexWithoutIds(fieldVecData[i]) +// assert.Equal(t, err, nil) +// binarySet, err := index.Serialize() +// assert.Equal(t, len(binarySet), 1) +// assert.Nil(t, err) +// codecIndex, err := indexCodec.Serialize(binarySet, indexParams, "test_index", UniqueID(i)) +// assert.Equal(t, len(codecIndex), 2) +// assert.Nil(t, err) +// keyPrefix := "distributed-query-test-index" +// segmentStr := strconv.FormatInt(int64(i), 10) +// key1 := path.Join(keyPrefix, collectionStr, segmentStr, "IVF") +// key2 := path.Join(keyPrefix, collectionStr, segmentStr, "indexParams") +// kv.Save(key1, string(codecIndex[0].Value)) +// kv.Save(key2, string(codecIndex[1].Value)) +// } +//} diff --git a/internal/distributed/queryservice/client/client_test.go b/internal/distributed/queryservice/client/client_test.go new file mode 100644 index 0000000000..15dfa00a90 --- /dev/null +++ b/internal/distributed/queryservice/client/client_test.go @@ -0,0 +1,261 @@ +package grpcqueryserviceclient + +//import ( +// "context" +// "encoding/binary" +// "fmt" +// "log" +// "math" +// "testing" +// "time" +// +// "github.com/golang/protobuf/proto" +// "github.com/stretchr/testify/assert" +// +// "github.com/zilliztech/milvus-distributed/internal/msgstream" +// "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" +// "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" +// "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +// "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" +// "github.com/zilliztech/milvus-distributed/internal/proto/querypb" +// qs "github.com/zilliztech/milvus-distributed/internal/queryservice" +//) +// +//const ( +// debug = false +// pulsarAddress = "pulsar://127.0.0.1:6650" +//) +// +//func TestClient_LoadCollection(t *testing.T) { +// var ctx context.Context +// if debug { +// ctx = context.Background() +// } else { +// var cancel context.CancelFunc +// ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) +// defer cancel() +// } +// +// //create queryService client +// qs.Params.Init() +// log.Println("QueryService address:", qs.Params.Address) +// log.Println("Init Query service client ...") +// client, err := NewClient(qs.Params.Address, 20*time.Second) +// assert.Nil(t, err) +// err = client.Init() +// assert.Nil(t, err) +// err = client.Start() +// assert.Nil(t, err) +// +// insertChannels := []string{"insert-0", "insert-1", "insert-2", "insert-3"} +// ddChannels := []string{"data-definition"} +// +// factory := pulsarms.NewFactory(pulsarAddress, receiveBufSize, pulsarBufSize) +// insertStream, _ := factory.NewTtMsgStream(ctx) +// insertStream.AsProducer(insertChannels) +// insertStream.Start() +// +// ddStream, err := factory.NewTtMsgStream(ctx) +// assert.NoError(t, err) +// ddStream.AsProducer(ddChannels) +// ddStream.Start() +// +// // showCollection +// showCollectionRequest := &querypb.ShowCollectionRequest{ +// DbID: 0, +// } +// showCollectionRes, err := client.ShowCollections(showCollectionRequest) +// fmt.Println("showCollectionRes: ", showCollectionRes) +// assert.Nil(t, err) +// +// //load collection +// loadCollectionRequest := &querypb.LoadCollectionRequest{ +// CollectionID: 1, +// Schema: genSchema(1), +// } +// loadCollectionRes, err := client.LoadCollection(loadCollectionRequest) +// fmt.Println("loadCollectionRes: ", loadCollectionRes) +// assert.Nil(t, err) +// +// // showCollection +// showCollectionRes, err = client.ShowCollections(showCollectionRequest) +// fmt.Println("showCollectionRes: ", showCollectionRes) +// assert.Nil(t, err) +// +// //showSegmentInfo +// getSegmentInfoRequest := &querypb.SegmentInfoRequest{ +// SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, +// } +// getSegmentInfoRes, err := client.GetSegmentInfo(getSegmentInfoRequest) +// fmt.Println("segment info : ", getSegmentInfoRes) +// assert.Nil(t, err) +// +// // insert msg +// for i := 0; i < 12; i++ { +// insertMsgPack, timeTickMsgPack := genInsert(1, 1, i*msgLength+1, 4, false) +// err := insertStream.Produce(insertMsgPack) +// assert.NoError(t, err) +// err = insertStream.Broadcast(timeTickMsgPack) +// assert.NoError(t, err) +// err = ddStream.Broadcast(timeTickMsgPack) +// assert.NoError(t, err) +// } +// +// getSegmentInfoRes, err = client.GetSegmentInfo(getSegmentInfoRequest) +// assert.Nil(t, err) +// fmt.Println("segment info : ", getSegmentInfoRes) +// +//} +// +//func TestClient_GetSegmentInfo(t *testing.T) { +// if !debug { +// _, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) +// defer cancel() +// } +// +// //create queryService client +// qs.Params.Init() +// log.Println("QueryService address:", qs.Params.Address) +// log.Println("Init Query service client ...") +// client, err := NewClient(qs.Params.Address, 20*time.Second) +// assert.Nil(t, err) +// err = client.Init() +// assert.Nil(t, err) +// err = client.Start() +// assert.Nil(t, err) +// +// //showSegmentInfo +// getSegmentInfoRequest := &querypb.SegmentInfoRequest{ +// SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, +// } +// getSegmentInfoRes, err := client.GetSegmentInfo(getSegmentInfoRequest) +// assert.Nil(t, err) +// fmt.Println("segment info : ", getSegmentInfoRes) +//} +// +//func TestClient_LoadPartitions(t *testing.T) { +// if !debug { +// _, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) +// defer cancel() +// } +// +// //create queryService client +// qs.Params.Init() +// log.Println("QueryService address:", qs.Params.Address) +// log.Println("Init Query service client ...") +// client, err := NewClient(qs.Params.Address, 20*time.Second) +// assert.Nil(t, err) +// err = client.Init() +// assert.Nil(t, err) +// err = client.Start() +// assert.Nil(t, err) +// +// loadPartitionRequest := &querypb.LoadPartitionRequest{ +// CollectionID: 1, +// Schema: genSchema(1), +// } +// loadPartitionRes, err := client.LoadPartitions(loadPartitionRequest) +// fmt.Println("loadCollectionRes: ", loadPartitionRes) +// assert.Nil(t, err) +//} +// +//func TestClient_GetChannels(t *testing.T) { +// if !debug { +// _, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) +// defer cancel() +// } +// +// //create queryService client +// qs.Params.Init() +// log.Println("QueryService address:", qs.Params.Address) +// log.Println("Init Query service client ...") +// client, err := NewClient(qs.Params.Address, 20*time.Second) +// assert.Nil(t, err) +// err = client.Init() +// assert.Nil(t, err) +// err = client.Start() +// assert.Nil(t, err) +// +// getTimeTickChannelRes, err := client.GetTimeTickChannel() +// fmt.Println("loadCollectionRes: ", getTimeTickChannelRes) +// assert.Nil(t, err) +//} +// +//func sendSearchRequest(ctx context.Context, searchChannels []string) { +// // test data generate +// const msgLength = 10 +// const receiveBufSize = 1024 +// const DIM = 16 +// +// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} +// // start search service +// dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }" +// var searchRawData1 []byte +// var searchRawData2 []byte +// for i, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele+float32(i*2))) +// searchRawData1 = append(searchRawData1, buf...) +// } +// for i, ele := range vec { +// buf := make([]byte, 4) +// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele+float32(i*4))) +// searchRawData2 = append(searchRawData2, buf...) +// } +// placeholderValue := milvuspb.PlaceholderValue{ +// Tag: "$0", +// Type: milvuspb.PlaceholderType_VECTOR_FLOAT, +// Values: [][]byte{searchRawData1, searchRawData2}, +// } +// +// placeholderGroup := milvuspb.PlaceholderGroup{ +// Placeholders: []*milvuspb.PlaceholderValue{&placeholderValue}, +// } +// +// placeGroupByte, err := proto.Marshal(&placeholderGroup) +// if err != nil { +// log.Print("marshal placeholderGroup failed") +// } +// +// query := milvuspb.SearchRequest{ +// Dsl: dslString, +// PlaceholderGroup: placeGroupByte, +// } +// +// queryByte, err := proto.Marshal(&query) +// if err != nil { +// log.Print("marshal query failed") +// } +// +// blob := commonpb.Blob{ +// Value: queryByte, +// } +// +// searchMsg := &msgstream.SearchMsg{ +// BaseMsg: msgstream.BaseMsg{ +// HashValues: []uint32{0}, +// }, +// SearchRequest: internalpb2.SearchRequest{ +// Base: &commonpb.MsgBase{ +// MsgType: commonpb.MsgType_kSearch, +// MsgID: 1, +// Timestamp: uint64(10 + 1000), +// SourceID: 1, +// }, +// ResultChannelID: "0", +// Query: &blob, +// }, +// } +// +// msgPackSearch := msgstream.MsgPack{} +// msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg) +// +// factory := pulsarms.NewFactory(pulsarAddress, receiveBufSize, 1024) +// searchStream, _ := factory.NewMsgStream(ctx) +// searchStream.AsProducer(searchChannels) +// searchStream.Start() +// err = searchStream.Produce(&msgPackSearch) +// if err != nil { +// panic(err) +// } +//} diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index f8dbe5dced..03d0b2a22a 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -325,23 +325,65 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { default: tsMsgList := make([]TsMsg, 0) - for { - ms.consumerLock.Lock() - chosen, value, ok := reflect.Select(ms.consumerReflects) - ms.consumerLock.Unlock() - if !ok { - log.Printf("channel closed") - return + //for { + // ms.consumerLock.Lock() + // chosen, value, ok := reflect.Select(ms.consumerReflects) + // ms.consumerLock.Unlock() + // if !ok { + // log.Printf("channel closed") + // return + // } + // + // pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage) + // + // if !ok { + // log.Printf("type assertion failed, not consumer message type") + // continue + // } + // ms.consumers[chosen].AckID(pulsarMsg.ID()) + // + // headerMsg := commonpb.MsgHeader{} + // err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) + // if err != nil { + // log.Printf("Failed to unmarshal message header, error = %v", err) + // continue + // } + // tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) + // if err != nil { + // log.Printf("Failed to unmarshal tsMsg, error = %v", err) + // continue + // } + // + // tsMsg.SetPosition(&msgstream.MsgPosition{ + // ChannelName: filepath.Base(pulsarMsg.Topic()), + // MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), + // }) + // tsMsgList = append(tsMsgList, tsMsg) + // + // noMoreMessage := true + // for i := 0; i < len(ms.consumers); i++ { + // if len(ms.consumers[i].Chan()) > 0 { + // noMoreMessage = false + // } + // } + // + // if noMoreMessage { + // break + // } + //} + + pulsarMsgBuffer := make([]pulsar.ConsumerMessage, 0) + ms.consumerLock.Lock() + consumers := ms.consumers + ms.consumerLock.Unlock() + for _, consumer := range consumers { + msgLen := len(consumer.Chan()) + for i := 0; i < msgLen; i++ { + msg := <-consumer.Chan() + pulsarMsgBuffer = append(pulsarMsgBuffer, msg) } - - pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage) - - if !ok { - log.Printf("type assertion failed, not consumer message type") - continue - } - ms.consumers[chosen].AckID(pulsarMsg.ID()) - + } + for _, pulsarMsg := range pulsarMsgBuffer { headerMsg := commonpb.MsgHeader{} err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) if err != nil { @@ -359,17 +401,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), }) tsMsgList = append(tsMsgList, tsMsg) - - noMoreMessage := true - for i := 0; i < len(ms.consumers); i++ { - if len(ms.consumers[i].Chan()) > 0 { - noMoreMessage = false - } - } - - if noMoreMessage { - break - } } if len(tsMsgList) > 0 { diff --git a/internal/msgstream/util/repack_func.go b/internal/msgstream/util/repack_func.go index 45e58f70c8..0727b73382 100644 --- a/internal/msgstream/util/repack_func.go +++ b/internal/msgstream/util/repack_func.go @@ -43,6 +43,9 @@ func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e Timestamp: insertRequest.Timestamps[index], SourceID: insertRequest.Base.SourceID, }, + DbID: insertRequest.DbID, + CollectionID: insertRequest.CollectionID, + PartitionID: insertRequest.PartitionID, CollectionName: insertRequest.CollectionName, PartitionName: insertRequest.PartitionName, SegmentID: insertRequest.SegmentID, diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 408a910465..0741193b16 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -274,36 +274,14 @@ func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*co return status, errors.New(errMsg) } - searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream) - if !ok { - errMsg := "type assertion failed for search message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - - resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream) - if !ok { - errMsg := "type assertion failed for search result message stream" - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: errMsg, - } - - return status, errors.New(errMsg) - } - // add request channel consumeChannels := []string{in.RequestChannelID} consumeSubName := Params.MsgChannelSubName - searchStream.AsConsumer(consumeChannels, consumeSubName) + node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName) // add result channel producerChannels := []string{in.ResultChannelID} - resultStream.AsProducer(producerChannels) + node.searchService.searchResultMsgStream.AsProducer(producerChannels) status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 02cb89f5fd..ac49de01ad 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -118,6 +118,27 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentI assert.NoError(t, err) } +func initDmChannel(insertChannels []string, node *QueryNode) { + watchReq := &querypb.WatchDmChannelsRequest{ + ChannelIDs: insertChannels, + } + _, err := node.WatchDmChannels(watchReq) + if err != nil { + panic(err) + } +} + +func initSearchChannel(searchChan string, resultChan string, node *QueryNode) { + searchReq := &querypb.AddQueryChannelsRequest{ + RequestChannelID: searchChan, + ResultChannelID: resultChan, + } + _, err := node.AddQueryChannel(searchReq) + if err != nil { + panic(err) + } +} + func newQueryNodeMock() *QueryNode { var ctx context.Context @@ -142,7 +163,6 @@ func newQueryNodeMock() *QueryNode { } return svr - } func makeNewChannelNames(names []string, suffix string) []string { diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index f6e55192ce..83a9a779bf 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -40,16 +40,15 @@ type ResultEntityIds []UniqueID func newSearchService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *searchService { receiveBufSize := Params.SearchReceiveBufSize - consumeChannels := Params.SearchChannelNames - consumeSubName := Params.MsgChannelSubName searchStream, _ := factory.NewMsgStream(ctx) - searchStream.AsConsumer(consumeChannels, consumeSubName) - var inputStream msgstream.MsgStream = searchStream - - producerChannels := Params.SearchResultChannelNames searchResultStream, _ := factory.NewMsgStream(ctx) - searchResultStream.AsProducer(producerChannels) - var outputStream msgstream.MsgStream = searchResultStream + + // query node doesn't need to consumer any search or search result channel actively. + //consumeChannels := Params.SearchChannelNames + //consumeSubName := Params.MsgChannelSubName + //searchStream.AsConsumer(consumeChannels, consumeSubName) + //producerChannels := Params.SearchResultChannelNames + //searchResultStream.AsProducer(producerChannels) searchServiceCtx, searchServiceCancel := context.WithCancel(ctx) msgBuffer := make(chan msgstream.TsMsg, receiveBufSize) @@ -64,8 +63,8 @@ func newSearchService(ctx context.Context, replica collectionReplica, factory ms replica: replica, tSafeWatcher: newTSafeWatcher(), - searchMsgStream: inputStream, - searchResultMsgStream: outputStream, + searchMsgStream: searchStream, + searchResultMsgStream: searchResultStream, queryNodeID: Params.QueryNodeID, } } diff --git a/internal/queryservice/load_test.go b/internal/queryservice/load_test.go deleted file mode 100644 index 8af9a6289e..0000000000 --- a/internal/queryservice/load_test.go +++ /dev/null @@ -1,443 +0,0 @@ -package queryservice - -import ( - "context" - "encoding/binary" - "math" - "strconv" - "testing" - - "github.com/golang/protobuf/proto" - - minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" -) - -//generate insert data -const msgLength = 100 -const receiveBufSize = 1024 -const pulsarBufSize = 1024 -const DIM = 16 - -func genInsert(collectionID int64, partitionID int64, segmentID int64, timeStart int) (*msgstream.MsgPack, *msgstream.MsgPack) { - msgs := make([]msgstream.TsMsg, 0) - for n := timeStart; n < timeStart+msgLength; n++ { - rowData := make([]byte, 0) - id := make([]byte, 8) - binary.BigEndian.PutUint64(id, uint64(n)) - rowData = append(rowData, id...) - time := make([]byte, 8) - binary.BigEndian.PutUint64(time, uint64(n)) - rowData = append(rowData, time...) - for i := 0; i < DIM; i++ { - vec := make([]byte, 4) - binary.BigEndian.PutUint32(vec, math.Float32bits(float32(n*i))) - rowData = append(rowData, vec...) - } - age := make([]byte, 4) - binary.BigEndian.PutUint32(age, 1) - rowData = append(rowData, age...) - blob := &commonpb.Blob{ - Value: rowData, - } - - var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{uint32(n)}, - }, - InsertRequest: internalpb2.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kInsert, - MsgID: 0, - Timestamp: uint64(n), - SourceID: 0, - }, - CollectionID: collectionID, - PartitionID: partitionID, - SegmentID: segmentID, - ChannelID: "0", - Timestamps: []uint64{uint64(n)}, - RowIDs: []int64{int64(n)}, - RowData: []*commonpb.Blob{blob}, - }, - } - msgs = append(msgs, insertMsg) - } - - insertMsgPack := &msgstream.MsgPack{ - BeginTs: uint64(timeStart), - EndTs: uint64(timeStart + msgLength), - Msgs: msgs, - } - - // generate timeTick - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{0}, - }, - TimeTickMsg: internalpb2.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kTimeTick, - MsgID: 0, - Timestamp: uint64(timeStart + msgLength), - SourceID: 0, - }, - }, - } - timeTickMsgPack := &msgstream.MsgPack{ - Msgs: []msgstream.TsMsg{timeTickMsg}, - } - return insertMsgPack, timeTickMsgPack -} - -func genSchema(collectionID int64) *schemapb.CollectionSchema { - fieldID := schemapb.FieldSchema{ - FieldID: UniqueID(0), - Name: "RowID", - IsPrimaryKey: false, - DataType: schemapb.DataType_INT64, - } - - fieldTime := schemapb.FieldSchema{ - FieldID: UniqueID(1), - Name: "Timestamp", - IsPrimaryKey: false, - DataType: schemapb.DataType_INT64, - } - - fieldVec := schemapb.FieldSchema{ - FieldID: UniqueID(100), - Name: "vec", - IsPrimaryKey: false, - DataType: schemapb.DataType_VECTOR_FLOAT, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "dim", - Value: "16", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "metric_type", - Value: "L2", - }, - }, - } - - fieldInt := schemapb.FieldSchema{ - FieldID: UniqueID(101), - Name: "age", - IsPrimaryKey: false, - DataType: schemapb.DataType_INT32, - } - - return &schemapb.CollectionSchema{ - Name: "collection-" + strconv.FormatInt(collectionID, 10), - AutoID: true, - Fields: []*schemapb.FieldSchema{ - &fieldID, &fieldTime, &fieldVec, &fieldInt, - }, - } -} - -func genCreateCollection(collectionID int64) *msgstream.MsgPack { - schema := genSchema(collectionID) - - byteSchema, err := proto.Marshal(schema) - if err != nil { - panic(err) - } - - request := internalpb2.CreateCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kCreateCollection, - Timestamp: uint64(10), - }, - DbID: 0, - CollectionID: collectionID, - Schema: byteSchema, - } - - msg := &msgstream.CreateCollectionMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{0}, - }, - CreateCollectionRequest: request, - } - - return &msgstream.MsgPack{ - Msgs: []msgstream.TsMsg{msg}, - } -} - -func genCreatePartition(collectionID int64, partitionID int64) *msgstream.MsgPack { - request := internalpb2.CreatePartitionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kCreatePartition, - Timestamp: uint64(20), - }, - DbID: 0, - CollectionID: collectionID, - PartitionID: partitionID, - } - - msg := &msgstream.CreatePartitionMsg{ - BaseMsg: msgstream.BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{0}, - }, - CreatePartitionRequest: request, - } - return &msgstream.MsgPack{ - Msgs: []msgstream.TsMsg{msg}, - } -} - -func getMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) { - minioAddress := "localhost:9000" - accessKeyID := "minioadmin" - secretAccessKey := "minioadmin" - useSSL := false - bucketName := "a-bucket" - - option := &minioKV.Option{ - Address: minioAddress, - AccessKeyID: accessKeyID, - SecretAccessKeyID: secretAccessKey, - UseSSL: useSSL, - BucketName: bucketName, - CreateBucket: true, - } - - return minioKV.NewMinIOKV(ctx, option) -} - -func TestLoadCollection(t *testing.T) { - //// produce msg - //insertChannels := []string{"insert-0"} - //ddChannels := []string{"data-definition-0"} - //pulsarAddress := "pulsar://127.0.0.1:6650" - // - //insertStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize) - //insertStream.SetPulsarClient(pulsarAddress) - //insertStream.AsProducer(insertChannels) - //ddStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize) - //ddStream.SetPulsarClient(pulsarAddress) - //ddStream.AsProducer(ddChannels) - // - //var insertMsgStream msgstream.MsgStream = insertStream - //insertMsgStream.Start() - //var ddMsgStream msgstream.MsgStream = ddStream - //ddMsgStream.Start() - // - //createCollectionMsgPack := genCreateCollection(1) - //createPartitionMsgPack := genCreatePartition(1, 1) - //ddMsgStream.Produce(createCollectionMsgPack) - //ddMsgStream.Produce(createPartitionMsgPack) - // - //consumeStream := pulsarms.NewPulsarTtMsgStream(context.Background(), receiveBufSize) - //consumeStream.SetPulsarClient(pulsarAddress) - //unmarshalDispatcher := util.NewUnmarshalDispatcher() - //consumeStream.AsConsumer(insertChannels, "test", unmarshalDispatcher, pulsarBufSize) - //consumeStream.Start() - // - //for i := 0; i < 10; i++ { - // insertMsgPack, timeTickMsgPack := genInsert(1, 1, int64(i), i*msgLength+1) - // err := insertMsgStream.Produce(insertMsgPack) - // assert.NoError(t, err) - // err = insertMsgStream.Broadcast(timeTickMsgPack) - // assert.NoError(t, err) - // err = ddMsgStream.Broadcast(timeTickMsgPack) - // assert.NoError(t, err) - //} - // - ////consume msg - //segPosition := make(map[int64][]*internalpb2.MsgPosition) - //segmentData := make([]*storage.InsertData, 0) - //indexRowDatas := make([][]float32, 0) - //for i := 0; i < 10; i++ { - // msgPack := consumeStream.Consume() - // idData := make([]int64, 0) - // timestamps := make([]int64, 0) - // fieldAgeData := make([]int32, 0) - // fieldVecData := make([]float32, 0) - // for n := 0; n < msgLength; n++ { - // blob := msgPack.Msgs[n].(*msgstream.InsertMsg).RowData[0].Value - // id := binary.BigEndian.Uint64(blob[0:8]) - // idData = append(idData, int64(id)) - // time := binary.BigEndian.Uint64(blob[8:16]) - // timestamps = append(timestamps, int64(time)) - // for i := 0; i < DIM; i++ { - // bits := binary.BigEndian.Uint32(blob[16+4*i : 16+4*(i+1)]) - // floatVec := math.Float32frombits(bits) - // fieldVecData = append(fieldVecData, floatVec) - // } - // ageValue := binary.BigEndian.Uint32(blob[80:84]) - // fieldAgeData = append(fieldAgeData, int32(ageValue)) - // } - // - // insertData := &storage.InsertData{ - // Data: map[int64]storage.FieldData{ - // 0: &storage.Int64FieldData{ - // NumRows: msgLength, - // Data: idData, - // }, - // 1: &storage.Int64FieldData{ - // NumRows: msgLength, - // Data: timestamps, - // }, - // 100: &storage.FloatVectorFieldData{ - // NumRows: msgLength, - // Data: fieldVecData, - // Dim: DIM, - // }, - // 101: &storage.Int32FieldData{ - // NumRows: msgLength, - // Data: fieldAgeData, - // }, - // }, - // } - // segPosition[int64(i)] = msgPack.StartPositions - // segmentData = append(segmentData, insertData) - // indexRowDatas = append(indexRowDatas, fieldVecData) - //} - // - ////gen inCodec - //collectionMeta := &etcdpb.CollectionMeta{ - // ID: 1, - // Schema: genSchema(1), - // CreateTime: 0, - // PartitionIDs: []int64{1}, - // SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - //} - //inCodec := storage.NewInsertCodec(collectionMeta) - // - //// get minio client - //minioKV, err := getMinioKV(context.Background()) - //assert.Nil(t, err) - // - //// write binlog minio - //collectionStr := strconv.FormatInt(1, 10) - //for i := 0; i < 9; i++ { - // binLogs, err := inCodec.Serialize(1, storage.UniqueID(i), segmentData[i]) - // assert.Nil(t, err) - // assert.Equal(t, len(binLogs), 4) - // keyPrefix := "distributed-query-test-binlog" - // segmentStr := strconv.FormatInt(int64(i), 10) - // - // for _, blob := range binLogs { - // key := path.Join(keyPrefix, collectionStr, segmentStr, blob.Key) - // err = minioKV.Save(key, string(blob.Value[:])) - // assert.Nil(t, err) - // } - //} - // - //// gen index build's indexParams - //indexParams := make(map[string]string) - //indexParams["index_type"] = "IVF_PQ" - //indexParams["index_mode"] = "cpu" - //indexParams["dim"] = "16" - //indexParams["k"] = "10" - //indexParams["nlist"] = "100" - //indexParams["nprobe"] = "10" - //indexParams["m"] = "4" - //indexParams["nbits"] = "8" - //indexParams["metric_type"] = "L2" - //indexParams["SLICE_SIZE"] = "400" - // - //var indexParamsKV []*commonpb.KeyValuePair - //for key, value := range indexParams { - // indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{ - // Key: key, - // Value: value, - // }) - //} - // - //// generator index and write index to minio - //for i := 0; i < 9; i++ { - // typeParams := make(map[string]string) - // typeParams["dim"] = "16" - // index, err := indexnode.NewCIndex(typeParams, indexParams) - // assert.Nil(t, err) - // err = index.BuildFloatVecIndexWithoutIds(indexRowDatas[i]) - // assert.Equal(t, err, nil) - // binarySet, err := index.Serialize() - // assert.Equal(t, len(binarySet), 1) - // assert.Nil(t, err) - // keyPrefix := "distributed-query-test-index" - // segmentStr := strconv.FormatInt(int64(i), 10) - // indexStr := strconv.FormatInt(int64(i), 10) - // key := path.Join(keyPrefix, collectionStr, segmentStr, indexStr) - // minioKV.Save(key, string(binarySet[0].Value)) - //} - // - ////generate query service - //service, err := NewQueryService(context.Background()) - //assert.Nil(t, err) - //collectionID := UniqueID(1) - //partitions := []UniqueID{1} - //col2partition := make(map[UniqueID][]UniqueID) - //col2partition[collectionID] = partitions - //segments := []UniqueID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - //partition2segment := make(map[UniqueID][]UniqueID) - //partition2segment[UniqueID(1)] = segments - //masterMock := &masterMock{ - // collectionIDs: []UniqueID{1}, - // col2partition: col2partition, - // partition2segment: partition2segment, - //} - //service.SetMasterService(masterMock) - //segStates := make(map[UniqueID]*datapb.SegmentStatesResponse) - //for i := 0; i < 10; i++ { - // if i != 9 { - // state := &datapb.SegmentStatesResponse{ - // State: datapb.SegmentState_SegmentFlushed, - // StartPositions: segPosition[int64(i)], - // } - // segStates[UniqueID(i)] = state - // } else { - // state := &datapb.SegmentStatesResponse{ - // State: datapb.SegmentState_SegmentGrowing, - // StartPositions: segPosition[int64(i)], - // } - // segStates[UniqueID(i)] = state - // } - //} - //dataMock := &dataMock{ - // segmentIDs: []UniqueID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - // segmentStates: segStates, - //} - // - //service.SetDataService(dataMock) - //service.SetEnableGrpc(true) - // - //loadCollectionRequest := &querypb.LoadCollectionRequest{ - // Base: &commonpb.MsgBase{ - // MsgType: commonpb.MsgType_kCreateCollection, - // }, - // DbID: UniqueID(0), - // CollectionID: collectionID, - //} - // - //registerRequest := &querypb.RegisterNodeRequest{ - // Address: &commonpb.Address{ - // Ip: "localhost", - // Port: 20010, - // }, - //} - //response, err := service.RegisterNode(registerRequest) - //assert.Nil(t, err) - //assert.Equal(t, response.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) - // - //status, err := service.LoadCollection(loadCollectionRequest) - //assert.Nil(t, err) - //assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS) -} diff --git a/internal/queryservice/mock.go b/internal/queryservice/mock.go new file mode 100644 index 0000000000..afe0f4ec5e --- /dev/null +++ b/internal/queryservice/mock.go @@ -0,0 +1,152 @@ +package queryservice + +import ( + "errors" + "strconv" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" +) + +const ( + numSegment = 12 +) + +type MasterMock struct { + CollectionIDs []UniqueID + Col2partition map[UniqueID][]UniqueID + Partition2segment map[UniqueID][]UniqueID +} + +func NewMasterMock() *MasterMock { + collectionIDs := make([]UniqueID, 0) + collectionIDs = append(collectionIDs, 1) + + col2partition := make(map[UniqueID][]UniqueID) + partitionIDs := make([]UniqueID, 0) + partitionIDs = append(partitionIDs, 1) + col2partition[1] = partitionIDs + + partition2segment := make(map[UniqueID][]UniqueID) + segmentIDs := make([]UniqueID, 0) + for i := 0; i < numSegment; i++ { + segmentIDs = append(segmentIDs, UniqueID(i)) + } + partition2segment[1] = segmentIDs + + return &MasterMock{ + CollectionIDs: collectionIDs, + Col2partition: col2partition, + Partition2segment: partition2segment, + } +} + +func (master *MasterMock) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { + collectionID := in.CollectionID + partitionIDs := make([]UniqueID, 0) + for _, id := range master.CollectionIDs { + if id == collectionID { + partitions := master.Col2partition[collectionID] + partitionIDs = append(partitionIDs, partitions...) + } + } + response := &milvuspb.ShowPartitionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + PartitionIDs: partitionIDs, + } + + return response, nil +} + +func (master *MasterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { + collectionID := in.CollectionID + partitionID := in.PartitionID + + for _, id := range master.CollectionIDs { + if id == collectionID { + partitions := master.Col2partition[collectionID] + for _, partition := range partitions { + if partition == partitionID { + return &milvuspb.ShowSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + //SegmentIDs: master.Partition2segment[partition], + }, nil + } + } + } + } + + return nil, errors.New("collection id or partition id not found") +} + +type DataMock struct { + SegmentIDs []UniqueID + SegmentStates map[UniqueID]*datapb.SegmentStateInfo +} + +func NewDataMock() *DataMock { + positions := make([]*internalpb2.MsgPosition, 0) + positions = append(positions, &internalpb2.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(0, 10)}) + positions = append(positions, &internalpb2.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(1, 10)}) + positions = append(positions, &internalpb2.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(2, 10)}) + positions = append(positions, &internalpb2.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(3, 10)}) + + fillStates := func(segmentID UniqueID, time uint64, position *internalpb2.MsgPosition) *datapb.SegmentStateInfo { + return &datapb.SegmentStateInfo{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + SegmentID: segmentID, + State: commonpb.SegmentState_SegmentFlushed, + CreateTime: time, + StartPosition: position, + } + } + segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo) + segmentIDs := make([]UniqueID, 0) + for i := 0; i < numSegment; i++ { + segmentIDs = append(segmentIDs, UniqueID(i)) + pick := i % 4 + segmentStates[UniqueID(i)] = fillStates(UniqueID(i), uint64(i), positions[pick]) + } + + return &DataMock{ + SegmentIDs: segmentIDs, + SegmentStates: segmentStates, + } +} + +func (data *DataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { + ret := &datapb.SegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + } + for _, segID := range req.SegmentIDs { + for _, segmentID := range data.SegmentIDs { + if segmentID == segID { + ret.States = append(ret.States, data.SegmentStates[segmentID]) + } + } + } + + if ret.States == nil { + return ret, nil + } + + return ret, nil +} +func (data *DataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { + return &internalpb2.StringList{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Values: []string{"insert-0", "insert-1", "insert-2", "insert-3"}, + }, nil +} diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index f646679629..e2144a421e 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -1,9 +1,6 @@ package queryservice import ( - "log" - "os" - "strconv" "sync" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" @@ -15,67 +12,15 @@ type UniqueID = typeutil.UniqueID type ParamTable struct { paramtable.BaseTable - Address string - Port int - - MasterServiceAddress string - DataServiceAddress string - - PulsarAddress string - ETCDAddress string - MetaRootPath string - + Address string + Port int QueryServiceID UniqueID - QueryNodeID UniqueID - QueryNodeNum uint64 - - FlowGraphMaxQueueLength int32 - FlowGraphMaxParallelism int32 - - // minio - MinioEndPoint string - MinioAccessKeyID string - MinioSecretAccessKey string - MinioUseSSLStr bool - MinioBucketName string - - // dm - InsertChannelNames []string - InsertChannelRange []int - InsertReceiveBufSize int64 - InsertPulsarBufSize int64 - - // dd - DDChannelNames []string - DDReceiveBufSize int64 - DDPulsarBufSize int64 - - // search - SearchChannelNames []string - SearchResultChannelNames []string - SearchReceiveBufSize int64 - SearchPulsarBufSize int64 - SearchResultReceiveBufSize int64 - // stats - StatsPublishInterval int - StatsChannelName string - StatsReceiveBufSize int64 - - // load index - LoadIndexChannelNames []string - LoadIndexReceiveBufSize int64 - LoadIndexPulsarBufSize int64 + StatsChannelName string // timetick - TimeTickChannelName string - TimeTickReceiveBufferSize int64 - - GracefulTime int64 - MsgChannelSubName string - DefaultPartitionTag string - SliceIndex int + TimeTickChannelName string } var Params ParamTable @@ -94,333 +39,13 @@ func (p *ParamTable) Init() { panic(err) } - queryNodeIDStr := os.Getenv("QUERY_NODE_ID") - if queryNodeIDStr == "" { - queryNodeIDList := p.QueryNodeIDList() - if len(queryNodeIDList) <= 0 { - queryNodeIDStr = "0" - } else { - queryNodeIDStr = strconv.Itoa(int(queryNodeIDList[0])) - } - } - - err = p.LoadYaml("advanced/common.yaml") - if err != nil { - panic(err) - } - err = p.Save("_queryNodeID", queryNodeIDStr) - if err != nil { - panic(err) - } - - p.initMinioEndPoint() - p.initMinioAccessKeyID() - p.initMinioSecretAccessKey() - p.initMinioUseSSLStr() - p.initMinioBucketName() - - p.initPulsarAddress() - p.initETCDAddress() - p.initMetaRootPath() - - p.initQueryNodeID() - p.initQueryNodeNum() - - p.initGracefulTime() - p.initMsgChannelSubName() - p.initDefaultPartitionTag() - p.initSliceIndex() - - p.initFlowGraphMaxQueueLength() - p.initFlowGraphMaxParallelism() - - p.initInsertChannelNames() - p.initInsertChannelRange() - p.initInsertReceiveBufSize() - p.initInsertPulsarBufSize() - - p.initDDChannelNames() - p.initDDReceiveBufSize() - p.initDDPulsarBufSize() - - p.initSearchChannelNames() - p.initSearchResultChannelNames() - p.initSearchReceiveBufSize() - p.initSearchPulsarBufSize() - p.initSearchResultReceiveBufSize() - - p.initStatsPublishInterval() p.initStatsChannelName() - p.initStatsReceiveBufSize() - - p.initLoadIndexChannelNames() - p.initLoadIndexReceiveBufSize() - p.initLoadIndexPulsarBufSize() - p.initTimeTickChannelName() - p.initTimeTickReceiveBufSize() - - p.initAddress() + p.initQueryServiceAddress() p.initPort() - p.initMasterServiceAddress() }) } -func (p *ParamTable) initMinioEndPoint() { - url, err := p.Load("_MinioAddress") - if err != nil { - panic(err) - } - p.MinioEndPoint = url -} - -func (p *ParamTable) initMinioAccessKeyID() { - id, err := p.Load("minio.accessKeyID") - if err != nil { - panic(err) - } - p.MinioAccessKeyID = id -} - -func (p *ParamTable) initMinioSecretAccessKey() { - key, err := p.Load("minio.secretAccessKey") - if err != nil { - panic(err) - } - p.MinioSecretAccessKey = key -} - -func (p *ParamTable) initMinioUseSSLStr() { - ssl, err := p.Load("minio.useSSL") - if err != nil { - panic(err) - } - sslBoolean, err := strconv.ParseBool(ssl) - if err != nil { - panic(err) - } - p.MinioUseSSLStr = sslBoolean -} - -func (p *ParamTable) initMinioBucketName() { - bucketName, err := p.Load("minio.bucketName") - if err != nil { - panic(err) - } - p.MinioBucketName = bucketName -} - -func (p *ParamTable) initPulsarAddress() { - url, err := p.Load("_PulsarAddress") - if err != nil { - panic(err) - } - p.PulsarAddress = url -} - -func (p *ParamTable) initMasterServiceAddress() { - url, err := p.Load("_MasterAddress") - if err != nil { - panic(err) - } - p.MasterServiceAddress = url -} - -func (p *ParamTable) initDataServiceAddress() { - url, err := p.Load("_DataServiceAddress") - if err != nil { - panic(err) - } - p.DataServiceAddress = url -} - -func (p *ParamTable) initQueryNodeID() { - queryNodeID, err := p.Load("_queryNodeID") - if err != nil { - panic(err) - } - id, err := strconv.Atoi(queryNodeID) - if err != nil { - panic(err) - } - p.QueryNodeID = UniqueID(id) -} - -func (p *ParamTable) initInsertChannelRange() { - insertChannelRange, err := p.Load("msgChannel.channelRange.insert") - if err != nil { - panic(err) - } - p.InsertChannelRange = paramtable.ConvertRangeToIntRange(insertChannelRange, ",") -} - -// advanced params -// stats -func (p *ParamTable) initStatsPublishInterval() { - p.StatsPublishInterval = p.ParseInt("queryNode.stats.publishInterval") -} - -// dataSync: -func (p *ParamTable) initFlowGraphMaxQueueLength() { - p.FlowGraphMaxQueueLength = p.ParseInt32("queryNode.dataSync.flowGraph.maxQueueLength") -} - -func (p *ParamTable) initFlowGraphMaxParallelism() { - p.FlowGraphMaxParallelism = p.ParseInt32("queryNode.dataSync.flowGraph.maxParallelism") -} - -// msgStream -func (p *ParamTable) initInsertReceiveBufSize() { - p.InsertReceiveBufSize = p.ParseInt64("queryNode.msgStream.insert.recvBufSize") -} - -func (p *ParamTable) initInsertPulsarBufSize() { - p.InsertPulsarBufSize = p.ParseInt64("queryNode.msgStream.insert.pulsarBufSize") -} - -func (p *ParamTable) initDDReceiveBufSize() { - revBufSize, err := p.Load("queryNode.msgStream.dataDefinition.recvBufSize") - if err != nil { - panic(err) - } - bufSize, err := strconv.Atoi(revBufSize) - if err != nil { - panic(err) - } - p.DDReceiveBufSize = int64(bufSize) -} - -func (p *ParamTable) initDDPulsarBufSize() { - pulsarBufSize, err := p.Load("queryNode.msgStream.dataDefinition.pulsarBufSize") - if err != nil { - panic(err) - } - bufSize, err := strconv.Atoi(pulsarBufSize) - if err != nil { - panic(err) - } - p.DDPulsarBufSize = int64(bufSize) -} - -func (p *ParamTable) initSearchReceiveBufSize() { - p.SearchReceiveBufSize = p.ParseInt64("queryNode.msgStream.search.recvBufSize") -} - -func (p *ParamTable) initSearchPulsarBufSize() { - p.SearchPulsarBufSize = p.ParseInt64("queryNode.msgStream.search.pulsarBufSize") -} - -func (p *ParamTable) initSearchResultReceiveBufSize() { - p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize") -} - -func (p *ParamTable) initStatsReceiveBufSize() { - p.StatsReceiveBufSize = p.ParseInt64("queryNode.msgStream.stats.recvBufSize") -} - -func (p *ParamTable) initETCDAddress() { - ETCDAddress, err := p.Load("_EtcdAddress") - if err != nil { - panic(err) - } - p.ETCDAddress = ETCDAddress -} - -func (p *ParamTable) initMetaRootPath() { - rootPath, err := p.Load("etcd.rootPath") - if err != nil { - panic(err) - } - subPath, err := p.Load("etcd.metaSubPath") - if err != nil { - panic(err) - } - p.MetaRootPath = rootPath + "/" + subPath -} - -func (p *ParamTable) initGracefulTime() { - p.GracefulTime = p.ParseInt64("queryNode.gracefulTime") -} - -func (p *ParamTable) initInsertChannelNames() { - - prefix, err := p.Load("msgChannel.chanNamePrefix.insert") - if err != nil { - log.Fatal(err) - } - prefix += "-" - channelRange, err := p.Load("msgChannel.channelRange.insert") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - sep := len(channelIDs) / int(p.QueryNodeNum) - index := p.SliceIndex - if index == -1 { - panic("queryNodeID not Match with Config") - } - start := index * sep - p.InsertChannelNames = ret[start : start+sep] -} - -func (p *ParamTable) initSearchChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.search") - if err != nil { - log.Fatal(err) - } - prefix += "-" - channelRange, err := p.Load("msgChannel.channelRange.search") - if err != nil { - panic(err) - } - - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - p.SearchChannelNames = ret -} - -func (p *ParamTable) initSearchResultChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.searchResult") - if err != nil { - log.Fatal(err) - } - prefix += "-" - channelRange, err := p.Load("msgChannel.channelRange.searchResult") - if err != nil { - panic(err) - } - - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - p.SearchResultChannelNames = ret -} - -func (p *ParamTable) initMsgChannelSubName() { - // TODO: subName = namePrefix + "-" + queryNodeID, queryNodeID is assigned by master - name, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") - if err != nil { - log.Panic(err) - } - queryNodeIDStr, err := p.Load("_QueryNodeID") - if err != nil { - panic(err) - } - p.MsgChannelSubName = name + "-" + queryNodeIDStr -} - func (p *ParamTable) initStatsChannelName() { channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats") if err != nil { @@ -429,65 +54,6 @@ func (p *ParamTable) initStatsChannelName() { p.StatsChannelName = channels } -func (p *ParamTable) initDDChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition") - if err != nil { - panic(err) - } - prefix += "-" - iRangeStr, err := p.Load("msgChannel.channelRange.dataDefinition") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - p.DDChannelNames = ret -} - -func (p *ParamTable) initDefaultPartitionTag() { - defaultTag, err := p.Load("common.defaultPartitionTag") - if err != nil { - panic(err) - } - - p.DefaultPartitionTag = defaultTag -} - -func (p *ParamTable) initSliceIndex() { - queryNodeID := p.QueryNodeID - queryNodeIDList := p.QueryNodeIDList() - for i := 0; i < len(queryNodeIDList); i++ { - if queryNodeID == queryNodeIDList[i] { - p.SliceIndex = i - return - } - } - p.SliceIndex = -1 -} - -func (p *ParamTable) initQueryNodeNum() { - p.QueryNodeNum = uint64(len(p.QueryNodeIDList())) -} - -func (p *ParamTable) initLoadIndexChannelNames() { - loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd") - if err != nil { - panic(err) - } - p.LoadIndexChannelNames = []string{loadIndexChannelName} -} - -func (p *ParamTable) initLoadIndexReceiveBufSize() { - p.LoadIndexReceiveBufSize = p.ParseInt64("queryNode.msgStream.loadIndex.recvBufSize") -} - -func (p *ParamTable) initLoadIndexPulsarBufSize() { - p.LoadIndexPulsarBufSize = p.ParseInt64("queryNode.msgStream.loadIndex.pulsarBufSize") -} - func (p *ParamTable) initTimeTickChannelName() { timeTickChannelName, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick") if err != nil { @@ -497,11 +63,7 @@ func (p *ParamTable) initTimeTickChannelName() { } -func (p *ParamTable) initTimeTickReceiveBufSize() { - p.TimeTickReceiveBufferSize = p.ParseInt64("queryNode.msgStream.timeTick.recvBufSize") -} - -func (p *ParamTable) initAddress() { +func (p *ParamTable) initQueryServiceAddress() { url, err := p.Load("_QueryServiceAddress") if err != nil { panic(err) diff --git a/internal/queryservice/querynode.go b/internal/queryservice/querynode.go index 46b03fa414..fe44bab4fc 100644 --- a/internal/queryservice/querynode.go +++ b/internal/queryservice/querynode.go @@ -32,6 +32,10 @@ func (qn *queryNodeInfo) AddDmChannels(channels []string) { qn.dmChannelNames = append(qn.dmChannelNames, channels...) } +func (qn *queryNodeInfo) AddQueryChannel(in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) { + return qn.client.AddQueryChannel(in) +} + func newQueryNodeInfo(client QueryNodeInterface) *queryNodeInfo { segments := make([]UniqueID, 0) dmChannelNames := make([]string, 0) diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 25bf6febad..949c6d80a0 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -161,6 +161,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) { dbID := req.DbID + fmt.Println("show collection start, dbID = ", dbID) collections, err := qs.replica.getCollections(dbID) collectionIDs := make([]UniqueID, 0) for _, collection := range collections { @@ -174,6 +175,7 @@ func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*qu }, }, err } + fmt.Println("show collection end") return &querypb.ShowCollectionResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -392,6 +394,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm return fn(err), err } for _, state := range resp.States { + fmt.Println("segment ", state.SegmentID, " 's state is ", state.StartPosition) segmentID := state.SegmentID segmentStates[segmentID] = state channelName := state.StartPosition.ChannelName @@ -501,7 +504,22 @@ func (qs *QueryService) CreateQueryChannel() (*querypb.CreateQueryChannelRespons allocatedQueryChannel := "query-" + strconv.FormatInt(int64(channelID), 10) allocatedQueryResultChannel := "queryResult-" + strconv.FormatInt(int64(channelID), 10) - //TODO:: query node watch query channels + addQueryChannelsRequest := &querypb.AddQueryChannelsRequest{ + RequestChannelID: allocatedQueryChannel, + ResultChannelID: allocatedQueryResultChannel, + } + for _, node := range qs.queryNodes { + _, err := node.AddQueryChannel(addQueryChannelsRequest) + if err != nil { + return &querypb.CreateQueryChannelResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + }, err + } + } + return &querypb.CreateQueryChannelResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index 2316d25cbb..a11ecc8bbf 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -2,169 +2,15 @@ package queryservice import ( "context" - "strconv" "testing" "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) -type masterMock struct { - collectionIDs []UniqueID - col2partition map[UniqueID][]UniqueID - partition2segment map[UniqueID][]UniqueID -} - -func newMasterMock() *masterMock { - collectionIDs := make([]UniqueID, 0) - collectionIDs = append(collectionIDs, 1) - - col2partition := make(map[UniqueID][]UniqueID) - partitionIDs := make([]UniqueID, 0) - partitionIDs = append(partitionIDs, 1) - col2partition[1] = partitionIDs - - partition2segment := make(map[UniqueID][]UniqueID) - segmentIDs := make([]UniqueID, 0) - segmentIDs = append(segmentIDs, 1) - segmentIDs = append(segmentIDs, 2) - segmentIDs = append(segmentIDs, 3) - segmentIDs = append(segmentIDs, 4) - segmentIDs = append(segmentIDs, 5) - segmentIDs = append(segmentIDs, 6) - partition2segment[1] = segmentIDs - - return &masterMock{ - collectionIDs: collectionIDs, - col2partition: col2partition, - partition2segment: partition2segment, - } -} - -func (master *masterMock) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - collectionID := in.CollectionID - partitionIDs := make([]UniqueID, 0) - for _, id := range master.collectionIDs { - if id == collectionID { - partitions := master.col2partition[collectionID] - partitionIDs = append(partitionIDs, partitions...) - } - } - response := &milvuspb.ShowPartitionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - PartitionIDs: partitionIDs, - } - - return response, nil -} - -func (master *masterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - collectionID := in.CollectionID - partitionID := in.PartitionID - - for _, id := range master.collectionIDs { - if id == collectionID { - partitions := master.col2partition[collectionID] - for _, partition := range partitions { - if partition == partitionID { - return &milvuspb.ShowSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - SegmentIDs: master.partition2segment[partition], - }, nil - } - } - } - } - - return nil, errors.New("collection id or partition id not found") -} - -type dataMock struct { - segmentIDs []UniqueID - segmentStates map[UniqueID]*datapb.SegmentStateInfo -} - -func newDataMock() *dataMock { - positions1 := make([]*internalpb2.MsgPosition, 0) - positions2 := make([]*internalpb2.MsgPosition, 0) - positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(1, 10)}) - positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(2, 10)}) - positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(3, 10)}) - positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(4, 10)}) - - segmentIDs := make([]UniqueID, 0) - segmentIDs = append(segmentIDs, 1) - segmentIDs = append(segmentIDs, 2) - segmentIDs = append(segmentIDs, 3) - segmentIDs = append(segmentIDs, 4) - segmentIDs = append(segmentIDs, 5) - segmentIDs = append(segmentIDs, 6) - - fillStates := func(segmentID UniqueID, time uint64, position []*internalpb2.MsgPosition, state commonpb.SegmentState) *datapb.SegmentStateInfo { - return &datapb.SegmentStateInfo{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - SegmentID: segmentID, - State: state, - CreateTime: time, - // StartPositions: position, - } - } - segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo) - segmentStates[1] = fillStates(1, 1, positions1, commonpb.SegmentState_SegmentFlushed) - segmentStates[2] = fillStates(2, 2, positions2, commonpb.SegmentState_SegmentFlushed) - segmentStates[3] = fillStates(3, 3, positions1, commonpb.SegmentState_SegmentFlushed) - segmentStates[4] = fillStates(4, 4, positions2, commonpb.SegmentState_SegmentFlushed) - segmentStates[5] = fillStates(5, 5, positions1, commonpb.SegmentState_SegmentGrowing) - segmentStates[6] = fillStates(6, 6, positions2, commonpb.SegmentState_SegmentGrowing) - - return &dataMock{ - segmentIDs: segmentIDs, - segmentStates: segmentStates, - } -} - -func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { - ret := &datapb.SegmentStatesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - } - for _, segID := range req.SegmentIDs { - for _, segmentID := range data.segmentIDs { - if segmentID == segID { - ret.States = append(ret.States, data.segmentStates[segmentID]) - } - } - } - - if ret.States == nil { - return nil, errors.New("segment id not found") - } - - return ret, nil -} -func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { - return &internalpb2.StringList{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - Values: []string{"test-insert"}, - }, nil -} - func TestQueryService_Init(t *testing.T) { msFactory := pulsarms.NewFactory() service, err := NewQueryService(context.Background(), msFactory) @@ -200,8 +46,8 @@ func TestQueryService_load(t *testing.T) { assert.Nil(t, err) service.Init() service.Start() - service.SetMasterService(newMasterMock()) - service.SetDataService(newDataMock()) + service.SetMasterService(NewMasterMock()) + service.SetDataService(NewDataMock()) registerNodeRequest := &querypb.RegisterNodeRequest{ Address: &commonpb.Address{}, }