Watch queryChannel on query node

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2021-02-09 17:09:26 +08:00 committed by yefu.chen
parent 833f1d49ab
commit 70fe5233cf
17 changed files with 1333 additions and 1258 deletions

View File

@ -2,6 +2,7 @@ package components
import (
"context"
"errors"
"fmt"
"log"
"time"
@ -18,7 +19,6 @@ import (
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
@ -91,119 +91,134 @@ func NewQueryNode(ctx context.Context, factory msgstream.Factory) (*QueryNode, e
addr := fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port)
log.Println("Master service address:", addr)
log.Println("Init master service client ...")
masterService, err := msc.NewGrpcClient(addr, 20*time.Second)
if err != nil {
panic(err)
}
var masterService *msc.GrpcClient = nil
if QueryMock {
svr.SetMasterService(&qns.MasterServiceMock{Count: 0})
} else {
masterService, err = msc.NewGrpcClient(addr, 20*time.Second)
if err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
ticker := time.NewTicker(interval * time.Millisecond)
tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond)
defer func() {
ticker.Stop()
tcancel()
}()
ticker := time.NewTicker(interval * time.Millisecond)
tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond)
defer func() {
ticker.Stop()
tcancel()
}()
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = masterService.GetComponentStates()
if err != nil {
continue
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = masterService.GetComponentStates()
if err != nil {
continue
}
case <-tctx.Done():
return nil, errors.New("master client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
case <-tctx.Done():
return nil, errors.New("master client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
}
if err := svr.SetMasterService(masterService); err != nil {
panic(err)
if err := svr.SetMasterService(masterService); err != nil {
panic(err)
}
}
// --- IndexService ---
is.Params.Init()
log.Println("Index service address:", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
var indexService *isc.Client = nil
if QueryMock {
svr.SetIndexService(&qns.IndexServiceMock{Count: 0})
} else {
indexService = isc.NewClient(is.Params.Address)
if err := indexService.Init(); err != nil {
panic(err)
}
if err := indexService.Init(); err != nil {
panic(err)
}
if err := indexService.Start(); err != nil {
panic(err)
}
if err := indexService.Start(); err != nil {
panic(err)
}
ticker = time.NewTicker(interval * time.Millisecond)
tctx, tcancel = context.WithTimeout(ctx, 10*interval*time.Millisecond)
defer func() {
ticker.Stop()
tcancel()
}()
ticker := time.NewTicker(interval * time.Millisecond)
tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond)
defer func() {
ticker.Stop()
tcancel()
}()
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = indexService.GetComponentStates()
if err != nil {
continue
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = indexService.GetComponentStates()
if err != nil {
continue
}
case <-tctx.Done():
return nil, errors.New("Index service client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
case <-tctx.Done():
return nil, errors.New("Index service client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
}
if err := svr.SetIndexService(indexService); err != nil {
panic(err)
if err := svr.SetIndexService(indexService); err != nil {
panic(err)
}
}
// --- DataService ---
ds.Params.Init()
log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port)
log.Println("Init data service client ...")
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
var dataService *dsc.Client = nil
if QueryMock {
svr.SetDataService(&qns.DataServiceMock{Count: 0})
} else {
dataService = dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
if cnt >= retry {
panic("Data service isn't ready")
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Data service isn't ready")
}
if err := svr.SetDataService(dataService); err != nil {
panic(err)
if err := svr.SetDataService(dataService); err != nil {
panic(err)
}
}
return &QueryNode{
@ -231,9 +246,11 @@ func (q *QueryNode) Run() error {
}
func (q *QueryNode) Stop() error {
_ = q.dataService.Stop()
_ = q.masterService.Stop()
if !QueryMock {
_ = q.dataService.Stop()
_ = q.masterService.Stop()
_ = q.indexService.Stop()
}
_ = q.queryService.Stop()
_ = q.indexService.Stop()
return q.svr.Stop()
}

View File

@ -6,12 +6,12 @@ import (
"log"
"time"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
qs "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -26,6 +26,10 @@ type QueryService struct {
masterService *msc.GrpcClient
}
const (
QueryMock = false
)
func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) {
const retry = 10
const interval = 200
@ -41,79 +45,92 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ
ms.Params.Init()
log.Printf("Master service address: %s:%d", ms.Params.Address, ms.Params.Port)
log.Println("Init master service client ...")
masterService, err := msc.NewGrpcClient(fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port), 20*time.Second)
if err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
var cnt int
for cnt = 0; cnt < retry; cnt++ {
time.Sleep(time.Duration(cnt*interval) * time.Millisecond)
if cnt != 0 {
log.Println("Master service isn't ready ...")
log.Printf("Retrying getting master service's states in ... %v ms", interval)
}
msStates, err := masterService.GetComponentStates()
var masterService *msc.GrpcClient = nil
if QueryMock {
masterMock := queryservice.NewMasterMock()
svr.SetMasterService(masterMock)
} else {
masterService, err = msc.NewGrpcClient(fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port), 20*time.Second)
if err != nil {
continue
panic(err)
}
if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if msStates.State.StateCode != internalpb2.StateCode_HEALTHY && msStates.State.StateCode != internalpb2.StateCode_INITIALIZING {
continue
}
break
}
if cnt >= retry {
panic("Master service isn't ready")
}
if err := svr.SetMasterService(masterService); err != nil {
panic(err)
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
var cnt int
for cnt = 0; cnt < retry; cnt++ {
time.Sleep(time.Duration(cnt*interval) * time.Millisecond)
if cnt != 0 {
log.Println("Master service isn't ready ...")
log.Printf("Retrying getting master service's states in ... %v ms", interval)
}
msStates, err := masterService.GetComponentStates()
if err != nil {
continue
}
if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if msStates.State.StateCode != internalpb2.StateCode_HEALTHY && msStates.State.StateCode != internalpb2.StateCode_INITIALIZING {
continue
}
break
}
if cnt >= retry {
panic("Master service isn't ready")
}
if err := svr.SetMasterService(masterService); err != nil {
panic(err)
}
}
// --- Data service client ---
ds.Params.Init()
log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port)
log.Println("Init data service client ...")
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
var dataService *dsc.Client = nil
if QueryMock {
dataMock := queryservice.NewDataMock()
svr.SetDataService(dataMock)
} else {
dataService = dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
continue
var cnt int
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
if cnt >= retry {
panic("Data service isn't ready")
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Data service isn't ready")
}
if err := svr.SetDataService(dataService); err != nil {
panic(err)
if err := svr.SetDataService(dataService); err != nil {
panic(err)
}
}
return &QueryService{
@ -137,7 +154,9 @@ func (qs *QueryService) Run() error {
}
func (qs *QueryService) Stop() error {
_ = qs.dataService.Stop()
_ = qs.masterService.Stop()
if !QueryMock {
_ = qs.dataService.Stop()
_ = qs.masterService.Stop()
}
return qs.svr.Stop()
}

View File

@ -0,0 +1,175 @@
package grpcquerynode
import (
"path"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
const (
collectionID = 1
binlogPathPrefix = "distributed-query-test-binlog"
indexPathPrefix = "distributed-query-test-index"
uidFieldID = 0
timestampFieldID = 1
vecFieldID = 100
ageFieldID = 101
vecParamsID = "indexParams"
vecDataID = "IVF"
)
var fieldIDs = []int64{uidFieldID, timestampFieldID, vecFieldID, ageFieldID}
/*
masterMock receive segmentID ,return indexID, segmentID = IndexID
dataMock return binlogPath, path = distributed-query-test-binlog/collectionID/segmentID/fieldID
indexMock return indexPath and IndexParam, indexPath = distributed-query-test-index/collectionID/segmentID/indexID,
indexParam use default:
indexID: 1
schema:
collectionID: 1
partitionID: 1
segmentID: [1, 10]
0: int64: uid
1: int64: timestamp
100: float32: vec: 16
101: int32: age
indexParams:
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_PQ"
indexParams["index_mode"] = "cpu"
indexParams["dim"] = "16"
indexParams["k"] = "10"
indexParams["nlist"] = "100"
indexParams["nprobe"] = "10"
indexParams["m"] = "4"
indexParams["nbits"] = "8"
indexParams["metric_type"] = "L2"
indexParams["SLICE_SIZE"] = "4"
*/
type MasterServiceMock struct {
Count int
}
func (m *MasterServiceMock) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
if m.Count < 20 {
m.Count++
return nil, errors.New("index not exit")
}
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_PQ"
indexParams["index_mode"] = "cpu"
indexParams["dim"] = "16"
indexParams["k"] = "10"
indexParams["nlist"] = "100"
indexParams["nprobe"] = "10"
indexParams["m"] = "4"
indexParams["nbits"] = "8"
indexParams["metric_type"] = "L2"
indexParams["SLICE_SIZE"] = "4"
params := make([]*commonpb.KeyValuePair, 0)
for k, v := range indexParams {
params = append(params, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
}
rsp := &milvuspb.DescribeSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
IndexID: in.SegmentID, // use index id as segment id
BuildID: in.SegmentID,
}
return rsp, nil
}
type DataServiceMock struct {
Count int
}
func (data *DataServiceMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
if data.Count < 10 {
data.Count++
return nil, errors.New("binlog not exist")
}
paths := make([]*internalPb.StringList, len(fieldIDs))
for i := range paths {
pathKey := path.Join(binlogPathPrefix,
strconv.FormatInt(collectionID, 10),
strconv.FormatInt(req.SegmentID, 10),
strconv.FormatInt(fieldIDs[i], 10))
paths[i] = &internalPb.StringList{
Values: []string{pathKey},
}
}
rsp := &datapb.InsertBinlogPathsResponse{
FieldIDs: fieldIDs,
Paths: paths,
}
return rsp, nil
}
type IndexServiceMock struct {
Count int
}
func (index *IndexServiceMock) GetIndexFilePaths(req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
if index.Count < 30 {
index.Count++
return nil, errors.New("index path not exist")
}
if len(req.IndexBuildIDs) != 1 {
panic("illegal index ids")
}
segmentID := req.IndexBuildIDs[0] // use index id as segment id
indexPaths1 := path.Join(indexPathPrefix,
strconv.FormatInt(collectionID, 10),
strconv.FormatInt(segmentID, 10),
vecDataID)
indexPaths2 := path.Join(indexPathPrefix,
strconv.FormatInt(collectionID, 10),
strconv.FormatInt(segmentID, 10),
vecParamsID)
indexPathInfo := make([]*indexpb.IndexFilePathInfo, 1)
indexPathInfo[0] = &indexpb.IndexFilePathInfo{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
IndexFilePaths: []string{indexPaths1, indexPaths2},
}
rsp := &indexpb.IndexFilePathsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
FilePaths: indexPathInfo,
}
return rsp, nil
}
type queryServiceMock struct{}
func (q *queryServiceMock) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
return &querypb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
InitParams: &internalPb.InitParams{
NodeID: int64(0),
},
}, nil
}

View File

@ -0,0 +1,88 @@
package grpcquerynode
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"testing"
"time"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/querynode"
)
const (
debug = true
ctxTimeInMillisecond = 2000
)
func TestQueryNodeDistributed_Service(t *testing.T) {
// Creates server.
var ctx context.Context
var cancel context.CancelFunc
if debug {
ctx, cancel = context.WithCancel(context.Background())
} else {
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
}
go mockMain(ctx)
<-ctx.Done()
cancel()
}
func mockMain(ctx context.Context) {
svr := newServerMock(ctx)
if err := svr.Init(); err != nil {
panic(err)
}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
var sig os.Signal
if err := svr.Start(); err != nil {
panic(err)
}
defer svr.Stop()
<-ctx.Done()
log.Print("Got signal to exit", zap.String("signal", sig.String()))
switch sig {
case syscall.SIGTERM:
os.Exit(0)
default:
os.Exit(1)
}
}
func newServerMock(ctx context.Context) *Server {
factory := pulsarms.NewFactory()
server := &Server{
node: querynode.NewQueryNodeWithoutID(ctx, factory),
}
if err := server.node.SetQueryService(&queryServiceMock{}); err != nil {
panic(err)
}
if err := server.node.SetMasterService(&MasterServiceMock{}); err != nil {
panic(err)
}
if err := server.node.SetIndexService(&IndexServiceMock{}); err != nil {
panic(err)
}
if err := server.node.SetDataService(&DataServiceMock{}); err != nil {
panic(err)
}
return server
}

View File

@ -0,0 +1,345 @@
package grpcqueryserviceclient
//import (
// "context"
// "encoding/binary"
// "math"
// "path"
// "strconv"
// "testing"
// "time"
//
// "github.com/stretchr/testify/assert"
//
// "github.com/zilliztech/milvus-distributed/internal/indexnode"
// minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
// "github.com/zilliztech/milvus-distributed/internal/msgstream"
// "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
// "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
// "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
// "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
// "github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
// "github.com/zilliztech/milvus-distributed/internal/storage"
// "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
//)
//
////generate insert data
//const msgLength = 100
//const receiveBufSize = 1024
//const pulsarBufSize = 1024
//const DIM = 16
//
//type UniqueID = typeutil.UniqueID
//
//func genInsert(collectionID int64,
// partitionID int64,
// timeStart int,
// numDmChannels int,
// binlog bool) (*msgstream.MsgPack, *msgstream.MsgPack) {
// msgs := make([]msgstream.TsMsg, 0)
// for n := timeStart; n < timeStart+msgLength; n++ {
// rowData := make([]byte, 0)
// if binlog {
// id := make([]byte, 8)
// binary.BigEndian.PutUint64(id, uint64(n))
// rowData = append(rowData, id...)
// time := make([]byte, 8)
// binary.BigEndian.PutUint64(time, uint64(n))
// rowData = append(rowData, time...)
// }
// for i := 0; i < DIM; i++ {
// vec := make([]byte, 4)
// binary.BigEndian.PutUint32(vec, math.Float32bits(float32(n*i)))
// rowData = append(rowData, vec...)
// }
// age := make([]byte, 4)
// binary.BigEndian.PutUint32(age, 1)
// rowData = append(rowData, age...)
// blob := &commonpb.Blob{
// Value: rowData,
// }
//
// var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{
// BaseMsg: msgstream.BaseMsg{
// HashValues: []uint32{uint32((n - 1) % numDmChannels)},
// },
// InsertRequest: internalpb2.InsertRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_kInsert,
// MsgID: 0,
// Timestamp: uint64(n),
// SourceID: 0,
// },
// CollectionID: collectionID,
// PartitionID: partitionID,
// SegmentID: UniqueID(((n - 1) % numDmChannels) + ((n-1)/(numDmChannels*msgLength))*numDmChannels),
// ChannelID: "0",
// Timestamps: []uint64{uint64(n)},
// RowIDs: []int64{int64(n)},
// RowData: []*commonpb.Blob{blob},
// },
// }
// //fmt.Println("hash value = ", insertMsg.(*msgstream.InsertMsg).HashValues, "segmentID = ", insertMsg.(*msgstream.InsertMsg).SegmentID)
// msgs = append(msgs, insertMsg)
// }
//
// insertMsgPack := &msgstream.MsgPack{
// BeginTs: uint64(timeStart),
// EndTs: uint64(timeStart + msgLength),
// Msgs: msgs,
// }
//
// // generate timeTick
// timeTickMsg := &msgstream.TimeTickMsg{
// BaseMsg: msgstream.BaseMsg{
// BeginTimestamp: 0,
// EndTimestamp: 0,
// HashValues: []uint32{0},
// },
// TimeTickMsg: internalpb2.TimeTickMsg{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_kTimeTick,
// MsgID: 0,
// Timestamp: uint64(timeStart + msgLength),
// SourceID: 0,
// },
// },
// }
// timeTickMsgPack := &msgstream.MsgPack{
// Msgs: []msgstream.TsMsg{timeTickMsg},
// }
// return insertMsgPack, timeTickMsgPack
//}
//
//func genSchema(collectionID int64) *schemapb.CollectionSchema {
// fieldID := schemapb.FieldSchema{
// FieldID: UniqueID(0),
// Name: "RowID",
// IsPrimaryKey: false,
// DataType: schemapb.DataType_INT64,
// }
//
// fieldTime := schemapb.FieldSchema{
// FieldID: UniqueID(1),
// Name: "Timestamp",
// IsPrimaryKey: false,
// DataType: schemapb.DataType_INT64,
// }
//
// fieldVec := schemapb.FieldSchema{
// FieldID: UniqueID(100),
// Name: "vec",
// IsPrimaryKey: false,
// DataType: schemapb.DataType_VECTOR_FLOAT,
// TypeParams: []*commonpb.KeyValuePair{
// {
// Key: "dim",
// Value: "16",
// },
// },
// IndexParams: []*commonpb.KeyValuePair{
// {
// Key: "metric_type",
// Value: "L2",
// },
// },
// }
//
// fieldInt := schemapb.FieldSchema{
// FieldID: UniqueID(101),
// Name: "age",
// IsPrimaryKey: false,
// DataType: schemapb.DataType_INT32,
// }
//
// return &schemapb.CollectionSchema{
// Name: "collection-" + strconv.FormatInt(collectionID, 10),
// AutoID: true,
// Fields: []*schemapb.FieldSchema{
// &fieldID, &fieldTime, &fieldVec, &fieldInt,
// },
// }
//}
//
//func getMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) {
// minioAddress := "localhost:9000"
// accessKeyID := "minioadmin"
// secretAccessKey := "minioadmin"
// useSSL := false
// bucketName := "a-bucket"
//
// option := &minioKV.Option{
// Address: minioAddress,
// AccessKeyID: accessKeyID,
// SecretAccessKeyID: secretAccessKey,
// UseSSL: useSSL,
// BucketName: bucketName,
// CreateBucket: true,
// }
//
// return minioKV.NewMinIOKV(ctx, option)
//}
//
//func TestWriteBinLog(t *testing.T) {
// const (
// debug = true
// consumeSubName = "test-load-collection-sub-name"
// )
// var ctx context.Context
// if debug {
// ctx = context.Background()
// } else {
// var cancel context.CancelFunc
// ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
// defer cancel()
// }
//
// // produce msg
// insertChannels := []string{"insert-0", "insert-1", "insert-2", "insert-3"}
// pulsarAddress := "pulsar://127.0.0.1:6650"
//
// factory := pulsarms.NewFactory(pulsarAddress, receiveBufSize, pulsarBufSize)
//
// insertStream, _ := factory.NewTtMsgStream(ctx)
// insertStream.AsProducer(insertChannels)
// insertStream.AsConsumer(insertChannels, consumeSubName)
// insertStream.Start()
//
// for i := 0; i < 12; i++ {
// insertMsgPack, timeTickMsgPack := genInsert(1, 1, i*msgLength+1, 4, true)
// err := insertStream.Produce(insertMsgPack)
// assert.NoError(t, err)
// err = insertStream.Broadcast(timeTickMsgPack)
// assert.NoError(t, err)
// }
//
// //consume msg
// segmentData := make([]*storage.InsertData, 12)
// idData := make([][]int64, 12)
// timestamps := make([][]int64, 12)
// fieldAgeData := make([][]int32, 12)
// fieldVecData := make([][]float32, 12)
// for i := 0; i < 12; i++ {
// idData[i] = make([]int64, 0)
// timestamps[i] = make([]int64, 0)
// fieldAgeData[i] = make([]int32, 0)
// fieldVecData[i] = make([]float32, 0)
// }
// for i := 0; i < 12; i++ {
// msgPack := insertStream.Consume()
//
// for n := 0; n < msgLength; n++ {
// segmentID := msgPack.Msgs[n].(*msgstream.InsertMsg).SegmentID
// blob := msgPack.Msgs[n].(*msgstream.InsertMsg).RowData[0].Value
// id := binary.BigEndian.Uint64(blob[0:8])
// idData[segmentID] = append(idData[segmentID], int64(id))
// t := binary.BigEndian.Uint64(blob[8:16])
// timestamps[segmentID] = append(timestamps[segmentID], int64(t))
// for i := 0; i < DIM; i++ {
// bits := binary.BigEndian.Uint32(blob[16+4*i : 16+4*(i+1)])
// floatVec := math.Float32frombits(bits)
// fieldVecData[segmentID] = append(fieldVecData[segmentID], floatVec)
// }
// ageValue := binary.BigEndian.Uint32(blob[80:84])
// fieldAgeData[segmentID] = append(fieldAgeData[segmentID], int32(ageValue))
// }
// }
// for i := 0; i < 12; i++ {
// insertData := &storage.InsertData{
// Data: map[int64]storage.FieldData{
// 0: &storage.Int64FieldData{
// NumRows: msgLength,
// Data: idData[i],
// },
// 1: &storage.Int64FieldData{
// NumRows: msgLength,
// Data: timestamps[i],
// },
// 100: &storage.FloatVectorFieldData{
// NumRows: msgLength,
// Data: fieldVecData[i],
// Dim: DIM,
// },
// 101: &storage.Int32FieldData{
// NumRows: msgLength,
// Data: fieldAgeData[i],
// },
// },
// }
// segmentData[i] = insertData
// }
//
// //gen inCodec
// collectionMeta := &etcdpb.CollectionMeta{
// ID: 1,
// Schema: genSchema(1),
// CreateTime: 0,
// PartitionIDs: []int64{1},
// SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
// }
// inCodec := storage.NewInsertCodec(collectionMeta)
// indexCodec := storage.NewIndexCodec()
//
// // get minio client
// kv, err := getMinioKV(context.Background())
// assert.Nil(t, err)
//
// // write binlog minio
// collectionStr := strconv.FormatInt(1, 10)
// for i := 0; i < 12; i++ {
// binLogs, err := inCodec.Serialize(1, storage.UniqueID(i), segmentData[i])
// assert.Nil(t, err)
// assert.Equal(t, len(binLogs), 4)
// keyPrefix := "distributed-query-test-binlog"
// segmentStr := strconv.FormatInt(int64(i), 10)
//
// for _, blob := range binLogs {
// key := path.Join(keyPrefix, collectionStr, segmentStr, blob.Key)
// err = kv.Save(key, string(blob.Value[:]))
// assert.Nil(t, err)
// }
// }
//
// // gen index build's indexParams
// indexParams := make(map[string]string)
// indexParams["index_type"] = "IVF_PQ"
// indexParams["index_mode"] = "cpu"
// indexParams["dim"] = "16"
// indexParams["k"] = "10"
// indexParams["nlist"] = "100"
// indexParams["nprobe"] = "10"
// indexParams["m"] = "4"
// indexParams["nbits"] = "8"
// indexParams["metric_type"] = "L2"
// indexParams["SLICE_SIZE"] = "400"
//
// var indexParamsKV []*commonpb.KeyValuePair
// for key, value := range indexParams {
// indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
// Key: key,
// Value: value,
// })
// }
//
// // generator index and write index to minio
// for i := 0; i < 12; i++ {
// typeParams := make(map[string]string)
// typeParams["dim"] = "16"
// index, err := indexnode.NewCIndex(typeParams, indexParams)
// assert.Nil(t, err)
// err = index.BuildFloatVecIndexWithoutIds(fieldVecData[i])
// assert.Equal(t, err, nil)
// binarySet, err := index.Serialize()
// assert.Equal(t, len(binarySet), 1)
// assert.Nil(t, err)
// codecIndex, err := indexCodec.Serialize(binarySet, indexParams, "test_index", UniqueID(i))
// assert.Equal(t, len(codecIndex), 2)
// assert.Nil(t, err)
// keyPrefix := "distributed-query-test-index"
// segmentStr := strconv.FormatInt(int64(i), 10)
// key1 := path.Join(keyPrefix, collectionStr, segmentStr, "IVF")
// key2 := path.Join(keyPrefix, collectionStr, segmentStr, "indexParams")
// kv.Save(key1, string(codecIndex[0].Value))
// kv.Save(key2, string(codecIndex[1].Value))
// }
//}

View File

@ -0,0 +1,261 @@
package grpcqueryserviceclient
//import (
// "context"
// "encoding/binary"
// "fmt"
// "log"
// "math"
// "testing"
// "time"
//
// "github.com/golang/protobuf/proto"
// "github.com/stretchr/testify/assert"
//
// "github.com/zilliztech/milvus-distributed/internal/msgstream"
// "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
// "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
// "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
// "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
// "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
// qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
//)
//
//const (
// debug = false
// pulsarAddress = "pulsar://127.0.0.1:6650"
//)
//
//func TestClient_LoadCollection(t *testing.T) {
// var ctx context.Context
// if debug {
// ctx = context.Background()
// } else {
// var cancel context.CancelFunc
// ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
// defer cancel()
// }
//
// //create queryService client
// qs.Params.Init()
// log.Println("QueryService address:", qs.Params.Address)
// log.Println("Init Query service client ...")
// client, err := NewClient(qs.Params.Address, 20*time.Second)
// assert.Nil(t, err)
// err = client.Init()
// assert.Nil(t, err)
// err = client.Start()
// assert.Nil(t, err)
//
// insertChannels := []string{"insert-0", "insert-1", "insert-2", "insert-3"}
// ddChannels := []string{"data-definition"}
//
// factory := pulsarms.NewFactory(pulsarAddress, receiveBufSize, pulsarBufSize)
// insertStream, _ := factory.NewTtMsgStream(ctx)
// insertStream.AsProducer(insertChannels)
// insertStream.Start()
//
// ddStream, err := factory.NewTtMsgStream(ctx)
// assert.NoError(t, err)
// ddStream.AsProducer(ddChannels)
// ddStream.Start()
//
// // showCollection
// showCollectionRequest := &querypb.ShowCollectionRequest{
// DbID: 0,
// }
// showCollectionRes, err := client.ShowCollections(showCollectionRequest)
// fmt.Println("showCollectionRes: ", showCollectionRes)
// assert.Nil(t, err)
//
// //load collection
// loadCollectionRequest := &querypb.LoadCollectionRequest{
// CollectionID: 1,
// Schema: genSchema(1),
// }
// loadCollectionRes, err := client.LoadCollection(loadCollectionRequest)
// fmt.Println("loadCollectionRes: ", loadCollectionRes)
// assert.Nil(t, err)
//
// // showCollection
// showCollectionRes, err = client.ShowCollections(showCollectionRequest)
// fmt.Println("showCollectionRes: ", showCollectionRes)
// assert.Nil(t, err)
//
// //showSegmentInfo
// getSegmentInfoRequest := &querypb.SegmentInfoRequest{
// SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
// }
// getSegmentInfoRes, err := client.GetSegmentInfo(getSegmentInfoRequest)
// fmt.Println("segment info : ", getSegmentInfoRes)
// assert.Nil(t, err)
//
// // insert msg
// for i := 0; i < 12; i++ {
// insertMsgPack, timeTickMsgPack := genInsert(1, 1, i*msgLength+1, 4, false)
// err := insertStream.Produce(insertMsgPack)
// assert.NoError(t, err)
// err = insertStream.Broadcast(timeTickMsgPack)
// assert.NoError(t, err)
// err = ddStream.Broadcast(timeTickMsgPack)
// assert.NoError(t, err)
// }
//
// getSegmentInfoRes, err = client.GetSegmentInfo(getSegmentInfoRequest)
// assert.Nil(t, err)
// fmt.Println("segment info : ", getSegmentInfoRes)
//
//}
//
//func TestClient_GetSegmentInfo(t *testing.T) {
// if !debug {
// _, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
// defer cancel()
// }
//
// //create queryService client
// qs.Params.Init()
// log.Println("QueryService address:", qs.Params.Address)
// log.Println("Init Query service client ...")
// client, err := NewClient(qs.Params.Address, 20*time.Second)
// assert.Nil(t, err)
// err = client.Init()
// assert.Nil(t, err)
// err = client.Start()
// assert.Nil(t, err)
//
// //showSegmentInfo
// getSegmentInfoRequest := &querypb.SegmentInfoRequest{
// SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
// }
// getSegmentInfoRes, err := client.GetSegmentInfo(getSegmentInfoRequest)
// assert.Nil(t, err)
// fmt.Println("segment info : ", getSegmentInfoRes)
//}
//
//func TestClient_LoadPartitions(t *testing.T) {
// if !debug {
// _, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
// defer cancel()
// }
//
// //create queryService client
// qs.Params.Init()
// log.Println("QueryService address:", qs.Params.Address)
// log.Println("Init Query service client ...")
// client, err := NewClient(qs.Params.Address, 20*time.Second)
// assert.Nil(t, err)
// err = client.Init()
// assert.Nil(t, err)
// err = client.Start()
// assert.Nil(t, err)
//
// loadPartitionRequest := &querypb.LoadPartitionRequest{
// CollectionID: 1,
// Schema: genSchema(1),
// }
// loadPartitionRes, err := client.LoadPartitions(loadPartitionRequest)
// fmt.Println("loadCollectionRes: ", loadPartitionRes)
// assert.Nil(t, err)
//}
//
//func TestClient_GetChannels(t *testing.T) {
// if !debug {
// _, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
// defer cancel()
// }
//
// //create queryService client
// qs.Params.Init()
// log.Println("QueryService address:", qs.Params.Address)
// log.Println("Init Query service client ...")
// client, err := NewClient(qs.Params.Address, 20*time.Second)
// assert.Nil(t, err)
// err = client.Init()
// assert.Nil(t, err)
// err = client.Start()
// assert.Nil(t, err)
//
// getTimeTickChannelRes, err := client.GetTimeTickChannel()
// fmt.Println("loadCollectionRes: ", getTimeTickChannelRes)
// assert.Nil(t, err)
//}
//
//func sendSearchRequest(ctx context.Context, searchChannels []string) {
// // test data generate
// const msgLength = 10
// const receiveBufSize = 1024
// const DIM = 16
//
// var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
// // start search service
// dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }"
// var searchRawData1 []byte
// var searchRawData2 []byte
// for i, ele := range vec {
// buf := make([]byte, 4)
// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele+float32(i*2)))
// searchRawData1 = append(searchRawData1, buf...)
// }
// for i, ele := range vec {
// buf := make([]byte, 4)
// binary.LittleEndian.PutUint32(buf, math.Float32bits(ele+float32(i*4)))
// searchRawData2 = append(searchRawData2, buf...)
// }
// placeholderValue := milvuspb.PlaceholderValue{
// Tag: "$0",
// Type: milvuspb.PlaceholderType_VECTOR_FLOAT,
// Values: [][]byte{searchRawData1, searchRawData2},
// }
//
// placeholderGroup := milvuspb.PlaceholderGroup{
// Placeholders: []*milvuspb.PlaceholderValue{&placeholderValue},
// }
//
// placeGroupByte, err := proto.Marshal(&placeholderGroup)
// if err != nil {
// log.Print("marshal placeholderGroup failed")
// }
//
// query := milvuspb.SearchRequest{
// Dsl: dslString,
// PlaceholderGroup: placeGroupByte,
// }
//
// queryByte, err := proto.Marshal(&query)
// if err != nil {
// log.Print("marshal query failed")
// }
//
// blob := commonpb.Blob{
// Value: queryByte,
// }
//
// searchMsg := &msgstream.SearchMsg{
// BaseMsg: msgstream.BaseMsg{
// HashValues: []uint32{0},
// },
// SearchRequest: internalpb2.SearchRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_kSearch,
// MsgID: 1,
// Timestamp: uint64(10 + 1000),
// SourceID: 1,
// },
// ResultChannelID: "0",
// Query: &blob,
// },
// }
//
// msgPackSearch := msgstream.MsgPack{}
// msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
//
// factory := pulsarms.NewFactory(pulsarAddress, receiveBufSize, 1024)
// searchStream, _ := factory.NewMsgStream(ctx)
// searchStream.AsProducer(searchChannels)
// searchStream.Start()
// err = searchStream.Produce(&msgPackSearch)
// if err != nil {
// panic(err)
// }
//}

View File

@ -325,23 +325,65 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
default:
tsMsgList := make([]TsMsg, 0)
for {
ms.consumerLock.Lock()
chosen, value, ok := reflect.Select(ms.consumerReflects)
ms.consumerLock.Unlock()
if !ok {
log.Printf("channel closed")
return
//for {
// ms.consumerLock.Lock()
// chosen, value, ok := reflect.Select(ms.consumerReflects)
// ms.consumerLock.Unlock()
// if !ok {
// log.Printf("channel closed")
// return
// }
//
// pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage)
//
// if !ok {
// log.Printf("type assertion failed, not consumer message type")
// continue
// }
// ms.consumers[chosen].AckID(pulsarMsg.ID())
//
// headerMsg := commonpb.MsgHeader{}
// err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
// if err != nil {
// log.Printf("Failed to unmarshal message header, error = %v", err)
// continue
// }
// tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType)
// if err != nil {
// log.Printf("Failed to unmarshal tsMsg, error = %v", err)
// continue
// }
//
// tsMsg.SetPosition(&msgstream.MsgPosition{
// ChannelName: filepath.Base(pulsarMsg.Topic()),
// MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()),
// })
// tsMsgList = append(tsMsgList, tsMsg)
//
// noMoreMessage := true
// for i := 0; i < len(ms.consumers); i++ {
// if len(ms.consumers[i].Chan()) > 0 {
// noMoreMessage = false
// }
// }
//
// if noMoreMessage {
// break
// }
//}
pulsarMsgBuffer := make([]pulsar.ConsumerMessage, 0)
ms.consumerLock.Lock()
consumers := ms.consumers
ms.consumerLock.Unlock()
for _, consumer := range consumers {
msgLen := len(consumer.Chan())
for i := 0; i < msgLen; i++ {
msg := <-consumer.Chan()
pulsarMsgBuffer = append(pulsarMsgBuffer, msg)
}
pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage)
if !ok {
log.Printf("type assertion failed, not consumer message type")
continue
}
ms.consumers[chosen].AckID(pulsarMsg.ID())
}
for _, pulsarMsg := range pulsarMsgBuffer {
headerMsg := commonpb.MsgHeader{}
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
if err != nil {
@ -359,17 +401,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()),
})
tsMsgList = append(tsMsgList, tsMsg)
noMoreMessage := true
for i := 0; i < len(ms.consumers); i++ {
if len(ms.consumers[i].Chan()) > 0 {
noMoreMessage = false
}
}
if noMoreMessage {
break
}
}
if len(tsMsgList) > 0 {

View File

@ -43,6 +43,9 @@ func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
Timestamp: insertRequest.Timestamps[index],
SourceID: insertRequest.Base.SourceID,
},
DbID: insertRequest.DbID,
CollectionID: insertRequest.CollectionID,
PartitionID: insertRequest.PartitionID,
CollectionName: insertRequest.CollectionName,
PartitionName: insertRequest.PartitionName,
SegmentID: insertRequest.SegmentID,

View File

@ -274,36 +274,14 @@ func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*co
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// add request channel
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
searchStream.AsConsumer(consumeChannels, consumeSubName)
node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName)
// add result channel
producerChannels := []string{in.ResultChannelID}
resultStream.AsProducer(producerChannels)
node.searchService.searchResultMsgStream.AsProducer(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,

View File

@ -118,6 +118,27 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentI
assert.NoError(t, err)
}
func initDmChannel(insertChannels []string, node *QueryNode) {
watchReq := &querypb.WatchDmChannelsRequest{
ChannelIDs: insertChannels,
}
_, err := node.WatchDmChannels(watchReq)
if err != nil {
panic(err)
}
}
func initSearchChannel(searchChan string, resultChan string, node *QueryNode) {
searchReq := &querypb.AddQueryChannelsRequest{
RequestChannelID: searchChan,
ResultChannelID: resultChan,
}
_, err := node.AddQueryChannel(searchReq)
if err != nil {
panic(err)
}
}
func newQueryNodeMock() *QueryNode {
var ctx context.Context
@ -142,7 +163,6 @@ func newQueryNodeMock() *QueryNode {
}
return svr
}
func makeNewChannelNames(names []string, suffix string) []string {

View File

@ -40,16 +40,15 @@ type ResultEntityIds []UniqueID
func newSearchService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *searchService {
receiveBufSize := Params.SearchReceiveBufSize
consumeChannels := Params.SearchChannelNames
consumeSubName := Params.MsgChannelSubName
searchStream, _ := factory.NewMsgStream(ctx)
searchStream.AsConsumer(consumeChannels, consumeSubName)
var inputStream msgstream.MsgStream = searchStream
producerChannels := Params.SearchResultChannelNames
searchResultStream, _ := factory.NewMsgStream(ctx)
searchResultStream.AsProducer(producerChannels)
var outputStream msgstream.MsgStream = searchResultStream
// query node doesn't need to consumer any search or search result channel actively.
//consumeChannels := Params.SearchChannelNames
//consumeSubName := Params.MsgChannelSubName
//searchStream.AsConsumer(consumeChannels, consumeSubName)
//producerChannels := Params.SearchResultChannelNames
//searchResultStream.AsProducer(producerChannels)
searchServiceCtx, searchServiceCancel := context.WithCancel(ctx)
msgBuffer := make(chan msgstream.TsMsg, receiveBufSize)
@ -64,8 +63,8 @@ func newSearchService(ctx context.Context, replica collectionReplica, factory ms
replica: replica,
tSafeWatcher: newTSafeWatcher(),
searchMsgStream: inputStream,
searchResultMsgStream: outputStream,
searchMsgStream: searchStream,
searchResultMsgStream: searchResultStream,
queryNodeID: Params.QueryNodeID,
}
}

View File

@ -1,443 +0,0 @@
package queryservice
import (
"context"
"encoding/binary"
"math"
"strconv"
"testing"
"github.com/golang/protobuf/proto"
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
//generate insert data
const msgLength = 100
const receiveBufSize = 1024
const pulsarBufSize = 1024
const DIM = 16
func genInsert(collectionID int64, partitionID int64, segmentID int64, timeStart int) (*msgstream.MsgPack, *msgstream.MsgPack) {
msgs := make([]msgstream.TsMsg, 0)
for n := timeStart; n < timeStart+msgLength; n++ {
rowData := make([]byte, 0)
id := make([]byte, 8)
binary.BigEndian.PutUint64(id, uint64(n))
rowData = append(rowData, id...)
time := make([]byte, 8)
binary.BigEndian.PutUint64(time, uint64(n))
rowData = append(rowData, time...)
for i := 0; i < DIM; i++ {
vec := make([]byte, 4)
binary.BigEndian.PutUint32(vec, math.Float32bits(float32(n*i)))
rowData = append(rowData, vec...)
}
age := make([]byte, 4)
binary.BigEndian.PutUint32(age, 1)
rowData = append(rowData, age...)
blob := &commonpb.Blob{
Value: rowData,
}
var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{uint32(n)},
},
InsertRequest: internalpb2.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kInsert,
MsgID: 0,
Timestamp: uint64(n),
SourceID: 0,
},
CollectionID: collectionID,
PartitionID: partitionID,
SegmentID: segmentID,
ChannelID: "0",
Timestamps: []uint64{uint64(n)},
RowIDs: []int64{int64(n)},
RowData: []*commonpb.Blob{blob},
},
}
msgs = append(msgs, insertMsg)
}
insertMsgPack := &msgstream.MsgPack{
BeginTs: uint64(timeStart),
EndTs: uint64(timeStart + msgLength),
Msgs: msgs,
}
// generate timeTick
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
},
TimeTickMsg: internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
MsgID: 0,
Timestamp: uint64(timeStart + msgLength),
SourceID: 0,
},
},
}
timeTickMsgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{timeTickMsg},
}
return insertMsgPack, timeTickMsgPack
}
func genSchema(collectionID int64) *schemapb.CollectionSchema {
fieldID := schemapb.FieldSchema{
FieldID: UniqueID(0),
Name: "RowID",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT64,
}
fieldTime := schemapb.FieldSchema{
FieldID: UniqueID(1),
Name: "Timestamp",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT64,
}
fieldVec := schemapb.FieldSchema{
FieldID: UniqueID(100),
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "metric_type",
Value: "L2",
},
},
}
fieldInt := schemapb.FieldSchema{
FieldID: UniqueID(101),
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
}
return &schemapb.CollectionSchema{
Name: "collection-" + strconv.FormatInt(collectionID, 10),
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldID, &fieldTime, &fieldVec, &fieldInt,
},
}
}
func genCreateCollection(collectionID int64) *msgstream.MsgPack {
schema := genSchema(collectionID)
byteSchema, err := proto.Marshal(schema)
if err != nil {
panic(err)
}
request := internalpb2.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreateCollection,
Timestamp: uint64(10),
},
DbID: 0,
CollectionID: collectionID,
Schema: byteSchema,
}
msg := &msgstream.CreateCollectionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
},
CreateCollectionRequest: request,
}
return &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
}
func genCreatePartition(collectionID int64, partitionID int64) *msgstream.MsgPack {
request := internalpb2.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreatePartition,
Timestamp: uint64(20),
},
DbID: 0,
CollectionID: collectionID,
PartitionID: partitionID,
}
msg := &msgstream.CreatePartitionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
},
CreatePartitionRequest: request,
}
return &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
}
func getMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) {
minioAddress := "localhost:9000"
accessKeyID := "minioadmin"
secretAccessKey := "minioadmin"
useSSL := false
bucketName := "a-bucket"
option := &minioKV.Option{
Address: minioAddress,
AccessKeyID: accessKeyID,
SecretAccessKeyID: secretAccessKey,
UseSSL: useSSL,
BucketName: bucketName,
CreateBucket: true,
}
return minioKV.NewMinIOKV(ctx, option)
}
func TestLoadCollection(t *testing.T) {
//// produce msg
//insertChannels := []string{"insert-0"}
//ddChannels := []string{"data-definition-0"}
//pulsarAddress := "pulsar://127.0.0.1:6650"
//
//insertStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize)
//insertStream.SetPulsarClient(pulsarAddress)
//insertStream.AsProducer(insertChannels)
//ddStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize)
//ddStream.SetPulsarClient(pulsarAddress)
//ddStream.AsProducer(ddChannels)
//
//var insertMsgStream msgstream.MsgStream = insertStream
//insertMsgStream.Start()
//var ddMsgStream msgstream.MsgStream = ddStream
//ddMsgStream.Start()
//
//createCollectionMsgPack := genCreateCollection(1)
//createPartitionMsgPack := genCreatePartition(1, 1)
//ddMsgStream.Produce(createCollectionMsgPack)
//ddMsgStream.Produce(createPartitionMsgPack)
//
//consumeStream := pulsarms.NewPulsarTtMsgStream(context.Background(), receiveBufSize)
//consumeStream.SetPulsarClient(pulsarAddress)
//unmarshalDispatcher := util.NewUnmarshalDispatcher()
//consumeStream.AsConsumer(insertChannels, "test", unmarshalDispatcher, pulsarBufSize)
//consumeStream.Start()
//
//for i := 0; i < 10; i++ {
// insertMsgPack, timeTickMsgPack := genInsert(1, 1, int64(i), i*msgLength+1)
// err := insertMsgStream.Produce(insertMsgPack)
// assert.NoError(t, err)
// err = insertMsgStream.Broadcast(timeTickMsgPack)
// assert.NoError(t, err)
// err = ddMsgStream.Broadcast(timeTickMsgPack)
// assert.NoError(t, err)
//}
//
////consume msg
//segPosition := make(map[int64][]*internalpb2.MsgPosition)
//segmentData := make([]*storage.InsertData, 0)
//indexRowDatas := make([][]float32, 0)
//for i := 0; i < 10; i++ {
// msgPack := consumeStream.Consume()
// idData := make([]int64, 0)
// timestamps := make([]int64, 0)
// fieldAgeData := make([]int32, 0)
// fieldVecData := make([]float32, 0)
// for n := 0; n < msgLength; n++ {
// blob := msgPack.Msgs[n].(*msgstream.InsertMsg).RowData[0].Value
// id := binary.BigEndian.Uint64(blob[0:8])
// idData = append(idData, int64(id))
// time := binary.BigEndian.Uint64(blob[8:16])
// timestamps = append(timestamps, int64(time))
// for i := 0; i < DIM; i++ {
// bits := binary.BigEndian.Uint32(blob[16+4*i : 16+4*(i+1)])
// floatVec := math.Float32frombits(bits)
// fieldVecData = append(fieldVecData, floatVec)
// }
// ageValue := binary.BigEndian.Uint32(blob[80:84])
// fieldAgeData = append(fieldAgeData, int32(ageValue))
// }
//
// insertData := &storage.InsertData{
// Data: map[int64]storage.FieldData{
// 0: &storage.Int64FieldData{
// NumRows: msgLength,
// Data: idData,
// },
// 1: &storage.Int64FieldData{
// NumRows: msgLength,
// Data: timestamps,
// },
// 100: &storage.FloatVectorFieldData{
// NumRows: msgLength,
// Data: fieldVecData,
// Dim: DIM,
// },
// 101: &storage.Int32FieldData{
// NumRows: msgLength,
// Data: fieldAgeData,
// },
// },
// }
// segPosition[int64(i)] = msgPack.StartPositions
// segmentData = append(segmentData, insertData)
// indexRowDatas = append(indexRowDatas, fieldVecData)
//}
//
////gen inCodec
//collectionMeta := &etcdpb.CollectionMeta{
// ID: 1,
// Schema: genSchema(1),
// CreateTime: 0,
// PartitionIDs: []int64{1},
// SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
//}
//inCodec := storage.NewInsertCodec(collectionMeta)
//
//// get minio client
//minioKV, err := getMinioKV(context.Background())
//assert.Nil(t, err)
//
//// write binlog minio
//collectionStr := strconv.FormatInt(1, 10)
//for i := 0; i < 9; i++ {
// binLogs, err := inCodec.Serialize(1, storage.UniqueID(i), segmentData[i])
// assert.Nil(t, err)
// assert.Equal(t, len(binLogs), 4)
// keyPrefix := "distributed-query-test-binlog"
// segmentStr := strconv.FormatInt(int64(i), 10)
//
// for _, blob := range binLogs {
// key := path.Join(keyPrefix, collectionStr, segmentStr, blob.Key)
// err = minioKV.Save(key, string(blob.Value[:]))
// assert.Nil(t, err)
// }
//}
//
//// gen index build's indexParams
//indexParams := make(map[string]string)
//indexParams["index_type"] = "IVF_PQ"
//indexParams["index_mode"] = "cpu"
//indexParams["dim"] = "16"
//indexParams["k"] = "10"
//indexParams["nlist"] = "100"
//indexParams["nprobe"] = "10"
//indexParams["m"] = "4"
//indexParams["nbits"] = "8"
//indexParams["metric_type"] = "L2"
//indexParams["SLICE_SIZE"] = "400"
//
//var indexParamsKV []*commonpb.KeyValuePair
//for key, value := range indexParams {
// indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
// Key: key,
// Value: value,
// })
//}
//
//// generator index and write index to minio
//for i := 0; i < 9; i++ {
// typeParams := make(map[string]string)
// typeParams["dim"] = "16"
// index, err := indexnode.NewCIndex(typeParams, indexParams)
// assert.Nil(t, err)
// err = index.BuildFloatVecIndexWithoutIds(indexRowDatas[i])
// assert.Equal(t, err, nil)
// binarySet, err := index.Serialize()
// assert.Equal(t, len(binarySet), 1)
// assert.Nil(t, err)
// keyPrefix := "distributed-query-test-index"
// segmentStr := strconv.FormatInt(int64(i), 10)
// indexStr := strconv.FormatInt(int64(i), 10)
// key := path.Join(keyPrefix, collectionStr, segmentStr, indexStr)
// minioKV.Save(key, string(binarySet[0].Value))
//}
//
////generate query service
//service, err := NewQueryService(context.Background())
//assert.Nil(t, err)
//collectionID := UniqueID(1)
//partitions := []UniqueID{1}
//col2partition := make(map[UniqueID][]UniqueID)
//col2partition[collectionID] = partitions
//segments := []UniqueID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
//partition2segment := make(map[UniqueID][]UniqueID)
//partition2segment[UniqueID(1)] = segments
//masterMock := &masterMock{
// collectionIDs: []UniqueID{1},
// col2partition: col2partition,
// partition2segment: partition2segment,
//}
//service.SetMasterService(masterMock)
//segStates := make(map[UniqueID]*datapb.SegmentStatesResponse)
//for i := 0; i < 10; i++ {
// if i != 9 {
// state := &datapb.SegmentStatesResponse{
// State: datapb.SegmentState_SegmentFlushed,
// StartPositions: segPosition[int64(i)],
// }
// segStates[UniqueID(i)] = state
// } else {
// state := &datapb.SegmentStatesResponse{
// State: datapb.SegmentState_SegmentGrowing,
// StartPositions: segPosition[int64(i)],
// }
// segStates[UniqueID(i)] = state
// }
//}
//dataMock := &dataMock{
// segmentIDs: []UniqueID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
// segmentStates: segStates,
//}
//
//service.SetDataService(dataMock)
//service.SetEnableGrpc(true)
//
//loadCollectionRequest := &querypb.LoadCollectionRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_kCreateCollection,
// },
// DbID: UniqueID(0),
// CollectionID: collectionID,
//}
//
//registerRequest := &querypb.RegisterNodeRequest{
// Address: &commonpb.Address{
// Ip: "localhost",
// Port: 20010,
// },
//}
//response, err := service.RegisterNode(registerRequest)
//assert.Nil(t, err)
//assert.Equal(t, response.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
//
//status, err := service.LoadCollection(loadCollectionRequest)
//assert.Nil(t, err)
//assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
}

View File

@ -0,0 +1,152 @@
package queryservice
import (
"errors"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
)
const (
numSegment = 12
)
type MasterMock struct {
CollectionIDs []UniqueID
Col2partition map[UniqueID][]UniqueID
Partition2segment map[UniqueID][]UniqueID
}
func NewMasterMock() *MasterMock {
collectionIDs := make([]UniqueID, 0)
collectionIDs = append(collectionIDs, 1)
col2partition := make(map[UniqueID][]UniqueID)
partitionIDs := make([]UniqueID, 0)
partitionIDs = append(partitionIDs, 1)
col2partition[1] = partitionIDs
partition2segment := make(map[UniqueID][]UniqueID)
segmentIDs := make([]UniqueID, 0)
for i := 0; i < numSegment; i++ {
segmentIDs = append(segmentIDs, UniqueID(i))
}
partition2segment[1] = segmentIDs
return &MasterMock{
CollectionIDs: collectionIDs,
Col2partition: col2partition,
Partition2segment: partition2segment,
}
}
func (master *MasterMock) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
collectionID := in.CollectionID
partitionIDs := make([]UniqueID, 0)
for _, id := range master.CollectionIDs {
if id == collectionID {
partitions := master.Col2partition[collectionID]
partitionIDs = append(partitionIDs, partitions...)
}
}
response := &milvuspb.ShowPartitionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
PartitionIDs: partitionIDs,
}
return response, nil
}
func (master *MasterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
collectionID := in.CollectionID
partitionID := in.PartitionID
for _, id := range master.CollectionIDs {
if id == collectionID {
partitions := master.Col2partition[collectionID]
for _, partition := range partitions {
if partition == partitionID {
return &milvuspb.ShowSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
//SegmentIDs: master.Partition2segment[partition],
}, nil
}
}
}
}
return nil, errors.New("collection id or partition id not found")
}
type DataMock struct {
SegmentIDs []UniqueID
SegmentStates map[UniqueID]*datapb.SegmentStateInfo
}
func NewDataMock() *DataMock {
positions := make([]*internalpb2.MsgPosition, 0)
positions = append(positions, &internalpb2.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(0, 10)})
positions = append(positions, &internalpb2.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(1, 10)})
positions = append(positions, &internalpb2.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(2, 10)})
positions = append(positions, &internalpb2.MsgPosition{ChannelName: "insert-" + strconv.FormatInt(3, 10)})
fillStates := func(segmentID UniqueID, time uint64, position *internalpb2.MsgPosition) *datapb.SegmentStateInfo {
return &datapb.SegmentStateInfo{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
SegmentID: segmentID,
State: commonpb.SegmentState_SegmentFlushed,
CreateTime: time,
StartPosition: position,
}
}
segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
segmentIDs := make([]UniqueID, 0)
for i := 0; i < numSegment; i++ {
segmentIDs = append(segmentIDs, UniqueID(i))
pick := i % 4
segmentStates[UniqueID(i)] = fillStates(UniqueID(i), uint64(i), positions[pick])
}
return &DataMock{
SegmentIDs: segmentIDs,
SegmentStates: segmentStates,
}
}
func (data *DataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
ret := &datapb.SegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
for _, segID := range req.SegmentIDs {
for _, segmentID := range data.SegmentIDs {
if segmentID == segID {
ret.States = append(ret.States, data.SegmentStates[segmentID])
}
}
}
if ret.States == nil {
return ret, nil
}
return ret, nil
}
func (data *DataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return &internalpb2.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Values: []string{"insert-0", "insert-1", "insert-2", "insert-3"},
}, nil
}

View File

@ -1,9 +1,6 @@
package queryservice
import (
"log"
"os"
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
@ -15,67 +12,15 @@ type UniqueID = typeutil.UniqueID
type ParamTable struct {
paramtable.BaseTable
Address string
Port int
MasterServiceAddress string
DataServiceAddress string
PulsarAddress string
ETCDAddress string
MetaRootPath string
Address string
Port int
QueryServiceID UniqueID
QueryNodeID UniqueID
QueryNodeNum uint64
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
// minio
MinioEndPoint string
MinioAccessKeyID string
MinioSecretAccessKey string
MinioUseSSLStr bool
MinioBucketName string
// dm
InsertChannelNames []string
InsertChannelRange []int
InsertReceiveBufSize int64
InsertPulsarBufSize int64
// dd
DDChannelNames []string
DDReceiveBufSize int64
DDPulsarBufSize int64
// search
SearchChannelNames []string
SearchResultChannelNames []string
SearchReceiveBufSize int64
SearchPulsarBufSize int64
SearchResultReceiveBufSize int64
// stats
StatsPublishInterval int
StatsChannelName string
StatsReceiveBufSize int64
// load index
LoadIndexChannelNames []string
LoadIndexReceiveBufSize int64
LoadIndexPulsarBufSize int64
StatsChannelName string
// timetick
TimeTickChannelName string
TimeTickReceiveBufferSize int64
GracefulTime int64
MsgChannelSubName string
DefaultPartitionTag string
SliceIndex int
TimeTickChannelName string
}
var Params ParamTable
@ -94,333 +39,13 @@ func (p *ParamTable) Init() {
panic(err)
}
queryNodeIDStr := os.Getenv("QUERY_NODE_ID")
if queryNodeIDStr == "" {
queryNodeIDList := p.QueryNodeIDList()
if len(queryNodeIDList) <= 0 {
queryNodeIDStr = "0"
} else {
queryNodeIDStr = strconv.Itoa(int(queryNodeIDList[0]))
}
}
err = p.LoadYaml("advanced/common.yaml")
if err != nil {
panic(err)
}
err = p.Save("_queryNodeID", queryNodeIDStr)
if err != nil {
panic(err)
}
p.initMinioEndPoint()
p.initMinioAccessKeyID()
p.initMinioSecretAccessKey()
p.initMinioUseSSLStr()
p.initMinioBucketName()
p.initPulsarAddress()
p.initETCDAddress()
p.initMetaRootPath()
p.initQueryNodeID()
p.initQueryNodeNum()
p.initGracefulTime()
p.initMsgChannelSubName()
p.initDefaultPartitionTag()
p.initSliceIndex()
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
p.initInsertChannelNames()
p.initInsertChannelRange()
p.initInsertReceiveBufSize()
p.initInsertPulsarBufSize()
p.initDDChannelNames()
p.initDDReceiveBufSize()
p.initDDPulsarBufSize()
p.initSearchChannelNames()
p.initSearchResultChannelNames()
p.initSearchReceiveBufSize()
p.initSearchPulsarBufSize()
p.initSearchResultReceiveBufSize()
p.initStatsPublishInterval()
p.initStatsChannelName()
p.initStatsReceiveBufSize()
p.initLoadIndexChannelNames()
p.initLoadIndexReceiveBufSize()
p.initLoadIndexPulsarBufSize()
p.initTimeTickChannelName()
p.initTimeTickReceiveBufSize()
p.initAddress()
p.initQueryServiceAddress()
p.initPort()
p.initMasterServiceAddress()
})
}
func (p *ParamTable) initMinioEndPoint() {
url, err := p.Load("_MinioAddress")
if err != nil {
panic(err)
}
p.MinioEndPoint = url
}
func (p *ParamTable) initMinioAccessKeyID() {
id, err := p.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
p.MinioAccessKeyID = id
}
func (p *ParamTable) initMinioSecretAccessKey() {
key, err := p.Load("minio.secretAccessKey")
if err != nil {
panic(err)
}
p.MinioSecretAccessKey = key
}
func (p *ParamTable) initMinioUseSSLStr() {
ssl, err := p.Load("minio.useSSL")
if err != nil {
panic(err)
}
sslBoolean, err := strconv.ParseBool(ssl)
if err != nil {
panic(err)
}
p.MinioUseSSLStr = sslBoolean
}
func (p *ParamTable) initMinioBucketName() {
bucketName, err := p.Load("minio.bucketName")
if err != nil {
panic(err)
}
p.MinioBucketName = bucketName
}
func (p *ParamTable) initPulsarAddress() {
url, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
p.PulsarAddress = url
}
func (p *ParamTable) initMasterServiceAddress() {
url, err := p.Load("_MasterAddress")
if err != nil {
panic(err)
}
p.MasterServiceAddress = url
}
func (p *ParamTable) initDataServiceAddress() {
url, err := p.Load("_DataServiceAddress")
if err != nil {
panic(err)
}
p.DataServiceAddress = url
}
func (p *ParamTable) initQueryNodeID() {
queryNodeID, err := p.Load("_queryNodeID")
if err != nil {
panic(err)
}
id, err := strconv.Atoi(queryNodeID)
if err != nil {
panic(err)
}
p.QueryNodeID = UniqueID(id)
}
func (p *ParamTable) initInsertChannelRange() {
insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
p.InsertChannelRange = paramtable.ConvertRangeToIntRange(insertChannelRange, ",")
}
// advanced params
// stats
func (p *ParamTable) initStatsPublishInterval() {
p.StatsPublishInterval = p.ParseInt("queryNode.stats.publishInterval")
}
// dataSync:
func (p *ParamTable) initFlowGraphMaxQueueLength() {
p.FlowGraphMaxQueueLength = p.ParseInt32("queryNode.dataSync.flowGraph.maxQueueLength")
}
func (p *ParamTable) initFlowGraphMaxParallelism() {
p.FlowGraphMaxParallelism = p.ParseInt32("queryNode.dataSync.flowGraph.maxParallelism")
}
// msgStream
func (p *ParamTable) initInsertReceiveBufSize() {
p.InsertReceiveBufSize = p.ParseInt64("queryNode.msgStream.insert.recvBufSize")
}
func (p *ParamTable) initInsertPulsarBufSize() {
p.InsertPulsarBufSize = p.ParseInt64("queryNode.msgStream.insert.pulsarBufSize")
}
func (p *ParamTable) initDDReceiveBufSize() {
revBufSize, err := p.Load("queryNode.msgStream.dataDefinition.recvBufSize")
if err != nil {
panic(err)
}
bufSize, err := strconv.Atoi(revBufSize)
if err != nil {
panic(err)
}
p.DDReceiveBufSize = int64(bufSize)
}
func (p *ParamTable) initDDPulsarBufSize() {
pulsarBufSize, err := p.Load("queryNode.msgStream.dataDefinition.pulsarBufSize")
if err != nil {
panic(err)
}
bufSize, err := strconv.Atoi(pulsarBufSize)
if err != nil {
panic(err)
}
p.DDPulsarBufSize = int64(bufSize)
}
func (p *ParamTable) initSearchReceiveBufSize() {
p.SearchReceiveBufSize = p.ParseInt64("queryNode.msgStream.search.recvBufSize")
}
func (p *ParamTable) initSearchPulsarBufSize() {
p.SearchPulsarBufSize = p.ParseInt64("queryNode.msgStream.search.pulsarBufSize")
}
func (p *ParamTable) initSearchResultReceiveBufSize() {
p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize")
}
func (p *ParamTable) initStatsReceiveBufSize() {
p.StatsReceiveBufSize = p.ParseInt64("queryNode.msgStream.stats.recvBufSize")
}
func (p *ParamTable) initETCDAddress() {
ETCDAddress, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
p.ETCDAddress = ETCDAddress
}
func (p *ParamTable) initMetaRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
p.MetaRootPath = rootPath + "/" + subPath
}
func (p *ParamTable) initGracefulTime() {
p.GracefulTime = p.ParseInt64("queryNode.gracefulTime")
}
func (p *ParamTable) initInsertChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.insert")
if err != nil {
log.Fatal(err)
}
prefix += "-"
channelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
sep := len(channelIDs) / int(p.QueryNodeNum)
index := p.SliceIndex
if index == -1 {
panic("queryNodeID not Match with Config")
}
start := index * sep
p.InsertChannelNames = ret[start : start+sep]
}
func (p *ParamTable) initSearchChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.search")
if err != nil {
log.Fatal(err)
}
prefix += "-"
channelRange, err := p.Load("msgChannel.channelRange.search")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
p.SearchChannelNames = ret
}
func (p *ParamTable) initSearchResultChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.searchResult")
if err != nil {
log.Fatal(err)
}
prefix += "-"
channelRange, err := p.Load("msgChannel.channelRange.searchResult")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
p.SearchResultChannelNames = ret
}
func (p *ParamTable) initMsgChannelSubName() {
// TODO: subName = namePrefix + "-" + queryNodeID, queryNodeID is assigned by master
name, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
if err != nil {
log.Panic(err)
}
queryNodeIDStr, err := p.Load("_QueryNodeID")
if err != nil {
panic(err)
}
p.MsgChannelSubName = name + "-" + queryNodeIDStr
}
func (p *ParamTable) initStatsChannelName() {
channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
if err != nil {
@ -429,65 +54,6 @@ func (p *ParamTable) initStatsChannelName() {
p.StatsChannelName = channels
}
func (p *ParamTable) initDDChannelNames() {
prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition")
if err != nil {
panic(err)
}
prefix += "-"
iRangeStr, err := p.Load("msgChannel.channelRange.dataDefinition")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
p.DDChannelNames = ret
}
func (p *ParamTable) initDefaultPartitionTag() {
defaultTag, err := p.Load("common.defaultPartitionTag")
if err != nil {
panic(err)
}
p.DefaultPartitionTag = defaultTag
}
func (p *ParamTable) initSliceIndex() {
queryNodeID := p.QueryNodeID
queryNodeIDList := p.QueryNodeIDList()
for i := 0; i < len(queryNodeIDList); i++ {
if queryNodeID == queryNodeIDList[i] {
p.SliceIndex = i
return
}
}
p.SliceIndex = -1
}
func (p *ParamTable) initQueryNodeNum() {
p.QueryNodeNum = uint64(len(p.QueryNodeIDList()))
}
func (p *ParamTable) initLoadIndexChannelNames() {
loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd")
if err != nil {
panic(err)
}
p.LoadIndexChannelNames = []string{loadIndexChannelName}
}
func (p *ParamTable) initLoadIndexReceiveBufSize() {
p.LoadIndexReceiveBufSize = p.ParseInt64("queryNode.msgStream.loadIndex.recvBufSize")
}
func (p *ParamTable) initLoadIndexPulsarBufSize() {
p.LoadIndexPulsarBufSize = p.ParseInt64("queryNode.msgStream.loadIndex.pulsarBufSize")
}
func (p *ParamTable) initTimeTickChannelName() {
timeTickChannelName, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick")
if err != nil {
@ -497,11 +63,7 @@ func (p *ParamTable) initTimeTickChannelName() {
}
func (p *ParamTable) initTimeTickReceiveBufSize() {
p.TimeTickReceiveBufferSize = p.ParseInt64("queryNode.msgStream.timeTick.recvBufSize")
}
func (p *ParamTable) initAddress() {
func (p *ParamTable) initQueryServiceAddress() {
url, err := p.Load("_QueryServiceAddress")
if err != nil {
panic(err)

View File

@ -32,6 +32,10 @@ func (qn *queryNodeInfo) AddDmChannels(channels []string) {
qn.dmChannelNames = append(qn.dmChannelNames, channels...)
}
func (qn *queryNodeInfo) AddQueryChannel(in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) {
return qn.client.AddQueryChannel(in)
}
func newQueryNodeInfo(client QueryNodeInterface) *queryNodeInfo {
segments := make([]UniqueID, 0)
dmChannelNames := make([]string, 0)

View File

@ -161,6 +161,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb
func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) {
dbID := req.DbID
fmt.Println("show collection start, dbID = ", dbID)
collections, err := qs.replica.getCollections(dbID)
collectionIDs := make([]UniqueID, 0)
for _, collection := range collections {
@ -174,6 +175,7 @@ func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*qu
},
}, err
}
fmt.Println("show collection end")
return &querypb.ShowCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
@ -392,6 +394,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
return fn(err), err
}
for _, state := range resp.States {
fmt.Println("segment ", state.SegmentID, " 's state is ", state.StartPosition)
segmentID := state.SegmentID
segmentStates[segmentID] = state
channelName := state.StartPosition.ChannelName
@ -501,7 +504,22 @@ func (qs *QueryService) CreateQueryChannel() (*querypb.CreateQueryChannelRespons
allocatedQueryChannel := "query-" + strconv.FormatInt(int64(channelID), 10)
allocatedQueryResultChannel := "queryResult-" + strconv.FormatInt(int64(channelID), 10)
//TODO:: query node watch query channels
addQueryChannelsRequest := &querypb.AddQueryChannelsRequest{
RequestChannelID: allocatedQueryChannel,
ResultChannelID: allocatedQueryResultChannel,
}
for _, node := range qs.queryNodes {
_, err := node.AddQueryChannel(addQueryChannelsRequest)
if err != nil {
return &querypb.CreateQueryChannelResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
}, err
}
}
return &querypb.CreateQueryChannelResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,

View File

@ -2,169 +2,15 @@ package queryservice
import (
"context"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
type masterMock struct {
collectionIDs []UniqueID
col2partition map[UniqueID][]UniqueID
partition2segment map[UniqueID][]UniqueID
}
func newMasterMock() *masterMock {
collectionIDs := make([]UniqueID, 0)
collectionIDs = append(collectionIDs, 1)
col2partition := make(map[UniqueID][]UniqueID)
partitionIDs := make([]UniqueID, 0)
partitionIDs = append(partitionIDs, 1)
col2partition[1] = partitionIDs
partition2segment := make(map[UniqueID][]UniqueID)
segmentIDs := make([]UniqueID, 0)
segmentIDs = append(segmentIDs, 1)
segmentIDs = append(segmentIDs, 2)
segmentIDs = append(segmentIDs, 3)
segmentIDs = append(segmentIDs, 4)
segmentIDs = append(segmentIDs, 5)
segmentIDs = append(segmentIDs, 6)
partition2segment[1] = segmentIDs
return &masterMock{
collectionIDs: collectionIDs,
col2partition: col2partition,
partition2segment: partition2segment,
}
}
func (master *masterMock) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
collectionID := in.CollectionID
partitionIDs := make([]UniqueID, 0)
for _, id := range master.collectionIDs {
if id == collectionID {
partitions := master.col2partition[collectionID]
partitionIDs = append(partitionIDs, partitions...)
}
}
response := &milvuspb.ShowPartitionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
PartitionIDs: partitionIDs,
}
return response, nil
}
func (master *masterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
collectionID := in.CollectionID
partitionID := in.PartitionID
for _, id := range master.collectionIDs {
if id == collectionID {
partitions := master.col2partition[collectionID]
for _, partition := range partitions {
if partition == partitionID {
return &milvuspb.ShowSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
SegmentIDs: master.partition2segment[partition],
}, nil
}
}
}
}
return nil, errors.New("collection id or partition id not found")
}
type dataMock struct {
segmentIDs []UniqueID
segmentStates map[UniqueID]*datapb.SegmentStateInfo
}
func newDataMock() *dataMock {
positions1 := make([]*internalpb2.MsgPosition, 0)
positions2 := make([]*internalpb2.MsgPosition, 0)
positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(1, 10)})
positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(2, 10)})
positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(3, 10)})
positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(4, 10)})
segmentIDs := make([]UniqueID, 0)
segmentIDs = append(segmentIDs, 1)
segmentIDs = append(segmentIDs, 2)
segmentIDs = append(segmentIDs, 3)
segmentIDs = append(segmentIDs, 4)
segmentIDs = append(segmentIDs, 5)
segmentIDs = append(segmentIDs, 6)
fillStates := func(segmentID UniqueID, time uint64, position []*internalpb2.MsgPosition, state commonpb.SegmentState) *datapb.SegmentStateInfo {
return &datapb.SegmentStateInfo{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
SegmentID: segmentID,
State: state,
CreateTime: time,
// StartPositions: position,
}
}
segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
segmentStates[1] = fillStates(1, 1, positions1, commonpb.SegmentState_SegmentFlushed)
segmentStates[2] = fillStates(2, 2, positions2, commonpb.SegmentState_SegmentFlushed)
segmentStates[3] = fillStates(3, 3, positions1, commonpb.SegmentState_SegmentFlushed)
segmentStates[4] = fillStates(4, 4, positions2, commonpb.SegmentState_SegmentFlushed)
segmentStates[5] = fillStates(5, 5, positions1, commonpb.SegmentState_SegmentGrowing)
segmentStates[6] = fillStates(6, 6, positions2, commonpb.SegmentState_SegmentGrowing)
return &dataMock{
segmentIDs: segmentIDs,
segmentStates: segmentStates,
}
}
func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
ret := &datapb.SegmentStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
for _, segID := range req.SegmentIDs {
for _, segmentID := range data.segmentIDs {
if segmentID == segID {
ret.States = append(ret.States, data.segmentStates[segmentID])
}
}
}
if ret.States == nil {
return nil, errors.New("segment id not found")
}
return ret, nil
}
func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return &internalpb2.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Values: []string{"test-insert"},
}, nil
}
func TestQueryService_Init(t *testing.T) {
msFactory := pulsarms.NewFactory()
service, err := NewQueryService(context.Background(), msFactory)
@ -200,8 +46,8 @@ func TestQueryService_load(t *testing.T) {
assert.Nil(t, err)
service.Init()
service.Start()
service.SetMasterService(newMasterMock())
service.SetDataService(newDataMock())
service.SetMasterService(NewMasterMock())
service.SetDataService(NewDataMock())
registerNodeRequest := &querypb.RegisterNodeRequest{
Address: &commonpb.Address{},
}