Add grpc for proxy

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2020-11-19 17:09:22 +08:00 committed by yefu.chen
parent 7b86d7ad63
commit 429c94e18b
8 changed files with 531 additions and 29 deletions

1
go.mod
View File

@ -29,7 +29,6 @@ require (
github.com/prometheus/common v0.10.0 // indirect
github.com/prometheus/procfs v0.1.3 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/cast v1.3.0
github.com/spf13/viper v1.7.1

1
go.sum
View File

@ -327,7 +327,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/protocolbuffers/protobuf v3.13.0+incompatible h1:omZA3Tuq+U2kJ2uMuqMR9c1VO5qLEgZ19m9878fXNtg=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=

View File

@ -345,43 +345,249 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv
}
func (p *Proxy) CreatePartition(ctx context.Context, in *servicepb.PartitionName) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
cpt := &CreatePartitionTask{
Condition: NewTaskCondition(ctx),
CreatePartitionRequest: internalpb.CreatePartitionRequest{
MsgType: internalpb.MsgType_kCreatePartition,
ReqID: 0,
Timestamp: 0,
ProxyID: 0,
PartitionName: in,
//TODO, ReqID,Timestamp,ProxyID
},
masterClient: p.masterClient,
result: nil,
ctx: nil,
}
var cancel func()
cpt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
err := func() error {
select {
case <-ctx.Done():
return errors.New("create partition timeout")
default:
return p.taskSch.DdQueue.Enqueue(cpt)
}
}()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
err = cpt.WaitToFinish()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
return cpt.result, nil
}
func (p *Proxy) DropPartition(ctx context.Context, in *servicepb.PartitionName) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
dpt := &DropPartitionTask{
Condition: NewTaskCondition(ctx),
DropPartitionRequest: internalpb.DropPartitionRequest{
MsgType: internalpb.MsgType_kDropPartition,
ReqID: 0,
Timestamp: 0,
ProxyID: 0,
PartitionName: in,
//TODO, ReqID,Timestamp,ProxyID
},
masterClient: p.masterClient,
result: nil,
ctx: nil,
}
var cancel func()
dpt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
err := func() error {
select {
case <-ctx.Done():
return errors.New("drop partition timeout")
default:
return p.taskSch.DdQueue.Enqueue(dpt)
}
}()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
err = dpt.WaitToFinish()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
return dpt.result, nil
}
func (p *Proxy) HasPartition(ctx context.Context, in *servicepb.PartitionName) (*servicepb.BoolResponse, error) {
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
hpt := &HasPartitionTask{
Condition: NewTaskCondition(ctx),
HasPartitionRequest: internalpb.HasPartitionRequest{
MsgType: internalpb.MsgType_kHasPartition,
ReqID: 0,
Timestamp: 0,
ProxyID: 0,
PartitionName: in,
//TODO, ReqID,Timestamp,ProxyID
},
Value: true,
}, nil
masterClient: p.masterClient,
result: nil,
ctx: nil,
}
var cancel func()
hpt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
err := func() error {
select {
case <-ctx.Done():
return errors.New("has partition timeout")
default:
return p.taskSch.DdQueue.Enqueue(hpt)
}
}()
if err != nil {
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
Value: false,
}, nil
}
err = hpt.WaitToFinish()
if err != nil {
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
Value: false,
}, nil
}
return hpt.result, nil
}
func (p *Proxy) DescribePartition(ctx context.Context, in *servicepb.PartitionName) (*servicepb.PartitionDescription, error) {
return &servicepb.PartitionDescription{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
dpt := &DescribePartitionTask{
Condition: NewTaskCondition(ctx),
DescribePartitionRequest: internalpb.DescribePartitionRequest{
MsgType: internalpb.MsgType_kDescribePartition,
ReqID: 0,
Timestamp: 0,
ProxyID: 0,
PartitionName: in,
//TODO, ReqID,Timestamp,ProxyID
},
}, nil
masterClient: p.masterClient,
result: nil,
ctx: nil,
}
var cancel func()
dpt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
err := func() error {
select {
case <-ctx.Done():
return errors.New("describe partion timeout")
default:
return p.taskSch.DdQueue.Enqueue(dpt)
}
}()
if err != nil {
return &servicepb.PartitionDescription{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
Name: in,
Statistics: nil,
}, nil
}
err = dpt.WaitToFinish()
if err != nil {
return &servicepb.PartitionDescription{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
Name: in,
Statistics: nil,
}, nil
}
return dpt.result, nil
}
func (p *Proxy) ShowPartitions(ctx context.Context, req *servicepb.CollectionName) (*servicepb.StringListResponse, error) {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
spt := &ShowPartitionsTask{
Condition: NewTaskCondition(ctx),
ShowPartitionRequest: internalpb.ShowPartitionRequest{
MsgType: internalpb.MsgType_kShowPartitions,
ReqID: 0,
Timestamp: 0,
ProxyID: 0,
CollectionName: req,
},
}, nil
masterClient: p.masterClient,
result: nil,
ctx: nil,
}
var cancel func()
spt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
err := func() error {
select {
case <-ctx.Done():
return errors.New("show partition timeout")
default:
return p.taskSch.DdQueue.Enqueue(spt)
}
}()
if err != nil {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
Values: nil,
}, nil
}
err = spt.WaitToFinish()
if err != nil {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
Values: nil,
}, nil
}
return spt.result, nil
}

View File

@ -22,6 +22,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
var ctx context.Context
@ -49,6 +50,15 @@ func startMaster(ctx context.Context) {
kvRootPath := path.Join(rootPath, "kv")
metaRootPath := path.Join(rootPath, "meta")
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
if err != nil {
panic(err)
}
_, err = etcdCli.Delete(context.TODO(), rootPath, clientv3.WithPrefix())
if err != nil {
panic(err)
}
opt := master.Option{
KVRootPath: kvRootPath,
MetaRootPath: metaRootPath,
@ -424,6 +434,67 @@ func TestProxy_DropCollection(t *testing.T) {
wg.Wait()
}
func TestProxy_PartitionGRPC(t *testing.T) {
var wg sync.WaitGroup
collName := "collPartTest"
filedName := "collPartTestF1"
collReq := &schemapb.CollectionSchema{
Name: collName,
Fields: []*schemapb.FieldSchema{
&schemapb.FieldSchema{
Name: filedName,
Description: "",
DataType: schemapb.DataType_VECTOR_FLOAT,
},
},
}
st, err := proxyClient.CreateCollection(ctx, collReq)
assert.Nil(t, err)
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
for i := 0; i < testNum; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
tag := fmt.Sprintf("partition-%d", i)
preq := &servicepb.PartitionName{
CollectionName: collName,
Tag: tag,
}
stb, err := proxyClient.HasPartition(ctx, preq)
assert.Nil(t, err)
assert.Equal(t, stb.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, stb.Value, false)
st, err := proxyClient.CreatePartition(ctx, preq)
assert.Nil(t, err)
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
stb, err = proxyClient.HasPartition(ctx, preq)
assert.Nil(t, err)
assert.Equal(t, stb.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, stb.Value, true)
std, err := proxyClient.DescribePartition(ctx, preq)
assert.Nil(t, err)
assert.Equal(t, std.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
sts, err := proxyClient.ShowPartitions(ctx, &servicepb.CollectionName{CollectionName: collName})
assert.Nil(t, err)
assert.Equal(t, sts.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.True(t, len(sts.Values) >= 1)
assert.True(t, len(sts.Values) <= testNum)
st, err = proxyClient.DropPartition(ctx, preq)
assert.Nil(t, err)
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
}()
}
wg.Wait()
}
func TestMain(m *testing.M) {
setup()
code := m.Run()

View File

@ -449,3 +449,208 @@ func (sct *ShowCollectionsTask) Execute() error {
func (sct *ShowCollectionsTask) PostExecute() error {
return nil
}
type CreatePartitionTask struct {
Condition
internalpb.CreatePartitionRequest
masterClient masterpb.MasterClient
result *commonpb.Status
ctx context.Context
}
func (cpt *CreatePartitionTask) ID() UniqueID {
return cpt.ReqID
}
func (cpt *CreatePartitionTask) Type() internalpb.MsgType {
return cpt.MsgType
}
func (cpt *CreatePartitionTask) BeginTs() Timestamp {
return cpt.Timestamp
}
func (cpt *CreatePartitionTask) EndTs() Timestamp {
return cpt.Timestamp
}
func (cpt *CreatePartitionTask) SetTs(ts Timestamp) {
cpt.Timestamp = ts
}
func (cpt *CreatePartitionTask) PreExecute() error {
return nil
}
func (cpt *CreatePartitionTask) Execute() (err error) {
cpt.result, err = cpt.masterClient.CreatePartition(cpt.ctx, &cpt.CreatePartitionRequest)
return err
}
func (cpt *CreatePartitionTask) PostExecute() error {
return nil
}
type DropPartitionTask struct {
Condition
internalpb.DropPartitionRequest
masterClient masterpb.MasterClient
result *commonpb.Status
ctx context.Context
}
func (dpt *DropPartitionTask) ID() UniqueID {
return dpt.ReqID
}
func (dpt *DropPartitionTask) Type() internalpb.MsgType {
return dpt.MsgType
}
func (dpt *DropPartitionTask) BeginTs() Timestamp {
return dpt.Timestamp
}
func (dpt *DropPartitionTask) EndTs() Timestamp {
return dpt.Timestamp
}
func (dpt *DropPartitionTask) SetTs(ts Timestamp) {
dpt.Timestamp = ts
}
func (dpt *DropPartitionTask) PreExecute() error {
return nil
}
func (dpt *DropPartitionTask) Execute() (err error) {
dpt.result, err = dpt.masterClient.DropPartition(dpt.ctx, &dpt.DropPartitionRequest)
return err
}
func (dpt *DropPartitionTask) PostExecute() error {
return nil
}
type HasPartitionTask struct {
Condition
internalpb.HasPartitionRequest
masterClient masterpb.MasterClient
result *servicepb.BoolResponse
ctx context.Context
}
func (hpt *HasPartitionTask) ID() UniqueID {
return hpt.ReqID
}
func (hpt *HasPartitionTask) Type() internalpb.MsgType {
return hpt.MsgType
}
func (hpt *HasPartitionTask) BeginTs() Timestamp {
return hpt.Timestamp
}
func (hpt *HasPartitionTask) EndTs() Timestamp {
return hpt.Timestamp
}
func (hpt *HasPartitionTask) SetTs(ts Timestamp) {
hpt.Timestamp = ts
}
func (hpt *HasPartitionTask) PreExecute() error {
return nil
}
func (hpt *HasPartitionTask) Execute() (err error) {
hpt.result, err = hpt.masterClient.HasPartition(hpt.ctx, &hpt.HasPartitionRequest)
return err
}
func (hpt *HasPartitionTask) PostExecute() error {
return nil
}
type DescribePartitionTask struct {
Condition
internalpb.DescribePartitionRequest
masterClient masterpb.MasterClient
result *servicepb.PartitionDescription
ctx context.Context
}
func (dpt *DescribePartitionTask) ID() UniqueID {
return dpt.ReqID
}
func (dpt *DescribePartitionTask) Type() internalpb.MsgType {
return dpt.MsgType
}
func (dpt *DescribePartitionTask) BeginTs() Timestamp {
return dpt.Timestamp
}
func (dpt *DescribePartitionTask) EndTs() Timestamp {
return dpt.Timestamp
}
func (dpt *DescribePartitionTask) SetTs(ts Timestamp) {
dpt.Timestamp = ts
}
func (dpt *DescribePartitionTask) PreExecute() error {
return nil
}
func (dpt *DescribePartitionTask) Execute() (err error) {
dpt.result, err = dpt.masterClient.DescribePartition(dpt.ctx, &dpt.DescribePartitionRequest)
return err
}
func (dpt *DescribePartitionTask) PostExecute() error {
return nil
}
type ShowPartitionsTask struct {
Condition
internalpb.ShowPartitionRequest
masterClient masterpb.MasterClient
result *servicepb.StringListResponse
ctx context.Context
}
func (spt *ShowPartitionsTask) ID() UniqueID {
return spt.ReqID
}
func (spt *ShowPartitionsTask) Type() internalpb.MsgType {
return spt.MsgType
}
func (spt *ShowPartitionsTask) BeginTs() Timestamp {
return spt.Timestamp
}
func (spt *ShowPartitionsTask) EndTs() Timestamp {
return spt.Timestamp
}
func (spt *ShowPartitionsTask) SetTs(ts Timestamp) {
spt.Timestamp = ts
}
func (spt *ShowPartitionsTask) PreExecute() error {
return nil
}
func (spt *ShowPartitionsTask) Execute() (err error) {
spt.result, err = spt.masterClient.ShowPartitions(spt.ctx, &spt.ShowPartitionRequest)
return err
}
func (spt *ShowPartitionsTask) PostExecute() error {
return nil
}

View File

@ -18,7 +18,7 @@ import (
)
// NOTE: start pulsar before test
func TestManipulationService_Start(t *testing.T) {
func TestDataSyncService_Start(t *testing.T) {
Params.Init()
var ctx context.Context
@ -155,6 +155,24 @@ func TestManipulationService_Start(t *testing.T) {
Msgs: insertMessages,
}
// generate timeTick
timeTickMsgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{0},
}
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerID: UniqueID(0),
Timestamp: math.MaxUint64,
}
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
// pulsar produce
const receiveBufSize = 1024
producerChannels := []string{"insert"}
@ -165,9 +183,13 @@ func TestManipulationService_Start(t *testing.T) {
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
// dataSync
node.dataSyncService = newDataSyncService(node.ctx, node.replica)
go node.dataSyncService.start()

View File

@ -20,7 +20,7 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
consumeChannels := []string{"insert"}
consumeSubName := "insertSub"
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

View File

@ -63,7 +63,7 @@ func (node *QueryNode) Start() {
node.statsService = newStatsService(node.ctx, node.replica)
go node.dataSyncService.start()
// go node.searchService.start()
go node.searchService.start()
go node.metaService.start()
node.statsService.start()
}