From 429c94e18b5445caaf203f97e93466b4f57bb268 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Thu, 19 Nov 2020 17:09:22 +0800 Subject: [PATCH] Add grpc for proxy Signed-off-by: neza2017 --- go.mod | 1 - go.sum | 1 - internal/proxy/grpc_service.go | 254 ++++++++++++++++-- internal/proxy/proxy_test.go | 71 +++++ internal/proxy/task.go | 205 ++++++++++++++ internal/reader/data_sync_service_test.go | 24 +- .../flow_graph_msg_stream_input_nodes.go | 2 +- internal/reader/query_node.go | 2 +- 8 files changed, 531 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index cda9129e8e..f8cb3e468e 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,6 @@ require ( github.com/prometheus/common v0.10.0 // indirect github.com/prometheus/procfs v0.1.3 // indirect github.com/sirupsen/logrus v1.6.0 // indirect - github.com/smartystreets/goconvey v1.6.4 // indirect github.com/spaolacci/murmur3 v1.1.0 github.com/spf13/cast v1.3.0 github.com/spf13/viper v1.7.1 diff --git a/go.sum b/go.sum index 8f530f558f..249aa05523 100644 --- a/go.sum +++ b/go.sum @@ -327,7 +327,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/protocolbuffers/protobuf v3.13.0+incompatible h1:omZA3Tuq+U2kJ2uMuqMR9c1VO5qLEgZ19m9878fXNtg= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/internal/proxy/grpc_service.go b/internal/proxy/grpc_service.go index 96a76d4fe6..bb8d924179 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxy/grpc_service.go @@ -345,43 +345,249 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv } func (p *Proxy) CreatePartition(ctx context.Context, in *servicepb.PartitionName) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, nil + cpt := &CreatePartitionTask{ + Condition: NewTaskCondition(ctx), + CreatePartitionRequest: internalpb.CreatePartitionRequest{ + MsgType: internalpb.MsgType_kCreatePartition, + ReqID: 0, + Timestamp: 0, + ProxyID: 0, + PartitionName: in, + //TODO, ReqID,Timestamp,ProxyID + }, + masterClient: p.masterClient, + result: nil, + ctx: nil, + } + var cancel func() + cpt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) + defer cancel() + + err := func() error { + select { + case <-ctx.Done(): + return errors.New("create partition timeout") + default: + return p.taskSch.DdQueue.Enqueue(cpt) + } + }() + + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + err = cpt.WaitToFinish() + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + return cpt.result, nil + } func (p *Proxy) DropPartition(ctx context.Context, in *servicepb.PartitionName) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, nil + dpt := &DropPartitionTask{ + Condition: NewTaskCondition(ctx), + DropPartitionRequest: internalpb.DropPartitionRequest{ + MsgType: internalpb.MsgType_kDropPartition, + ReqID: 0, + Timestamp: 0, + ProxyID: 0, + PartitionName: in, + //TODO, ReqID,Timestamp,ProxyID + }, + masterClient: p.masterClient, + result: nil, + ctx: nil, + } + + var cancel func() + dpt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) + defer cancel() + + err := func() error { + select { + case <-ctx.Done(): + return errors.New("drop partition timeout") + default: + return p.taskSch.DdQueue.Enqueue(dpt) + } + }() + + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + err = dpt.WaitToFinish() + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + return dpt.result, nil + } func (p *Proxy) HasPartition(ctx context.Context, in *servicepb.PartitionName) (*servicepb.BoolResponse, error) { - return &servicepb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", + hpt := &HasPartitionTask{ + Condition: NewTaskCondition(ctx), + HasPartitionRequest: internalpb.HasPartitionRequest{ + MsgType: internalpb.MsgType_kHasPartition, + ReqID: 0, + Timestamp: 0, + ProxyID: 0, + PartitionName: in, + //TODO, ReqID,Timestamp,ProxyID }, - Value: true, - }, nil + masterClient: p.masterClient, + result: nil, + ctx: nil, + } + + var cancel func() + hpt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) + defer cancel() + + err := func() error { + select { + case <-ctx.Done(): + return errors.New("has partition timeout") + default: + return p.taskSch.DdQueue.Enqueue(hpt) + } + }() + + if err != nil { + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + Value: false, + }, nil + } + err = hpt.WaitToFinish() + if err != nil { + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + Value: false, + }, nil + } + return hpt.result, nil + } func (p *Proxy) DescribePartition(ctx context.Context, in *servicepb.PartitionName) (*servicepb.PartitionDescription, error) { - return &servicepb.PartitionDescription{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", + dpt := &DescribePartitionTask{ + Condition: NewTaskCondition(ctx), + DescribePartitionRequest: internalpb.DescribePartitionRequest{ + MsgType: internalpb.MsgType_kDescribePartition, + ReqID: 0, + Timestamp: 0, + ProxyID: 0, + PartitionName: in, + //TODO, ReqID,Timestamp,ProxyID }, - }, nil + masterClient: p.masterClient, + result: nil, + ctx: nil, + } + + var cancel func() + dpt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) + defer cancel() + + err := func() error { + select { + case <-ctx.Done(): + return errors.New("describe partion timeout") + default: + return p.taskSch.DdQueue.Enqueue(dpt) + } + }() + + if err != nil { + return &servicepb.PartitionDescription{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + Name: in, + Statistics: nil, + }, nil + } + + err = dpt.WaitToFinish() + if err != nil { + return &servicepb.PartitionDescription{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + Name: in, + Statistics: nil, + }, nil + } + return dpt.result, nil } func (p *Proxy) ShowPartitions(ctx context.Context, req *servicepb.CollectionName) (*servicepb.StringListResponse, error) { - return &servicepb.StringListResponse{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", + spt := &ShowPartitionsTask{ + Condition: NewTaskCondition(ctx), + ShowPartitionRequest: internalpb.ShowPartitionRequest{ + MsgType: internalpb.MsgType_kShowPartitions, + ReqID: 0, + Timestamp: 0, + ProxyID: 0, + CollectionName: req, }, - }, nil + masterClient: p.masterClient, + result: nil, + ctx: nil, + } + + var cancel func() + spt.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) + defer cancel() + + err := func() error { + select { + case <-ctx.Done(): + return errors.New("show partition timeout") + default: + return p.taskSch.DdQueue.Enqueue(spt) + } + }() + + if err != nil { + return &servicepb.StringListResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + Values: nil, + }, nil + } + + err = spt.WaitToFinish() + if err != nil { + return &servicepb.StringListResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, + Values: nil, + }, nil + } + return spt.result, nil } diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index c42ef760b7..55cf118312 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -22,6 +22,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" ) var ctx context.Context @@ -49,6 +50,15 @@ func startMaster(ctx context.Context) { kvRootPath := path.Join(rootPath, "kv") metaRootPath := path.Join(rootPath, "meta") + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + if err != nil { + panic(err) + } + _, err = etcdCli.Delete(context.TODO(), rootPath, clientv3.WithPrefix()) + if err != nil { + panic(err) + } + opt := master.Option{ KVRootPath: kvRootPath, MetaRootPath: metaRootPath, @@ -424,6 +434,67 @@ func TestProxy_DropCollection(t *testing.T) { wg.Wait() } +func TestProxy_PartitionGRPC(t *testing.T) { + var wg sync.WaitGroup + collName := "collPartTest" + filedName := "collPartTestF1" + collReq := &schemapb.CollectionSchema{ + Name: collName, + Fields: []*schemapb.FieldSchema{ + &schemapb.FieldSchema{ + Name: filedName, + Description: "", + DataType: schemapb.DataType_VECTOR_FLOAT, + }, + }, + } + st, err := proxyClient.CreateCollection(ctx, collReq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + for i := 0; i < testNum; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + tag := fmt.Sprintf("partition-%d", i) + preq := &servicepb.PartitionName{ + CollectionName: collName, + Tag: tag, + } + + stb, err := proxyClient.HasPartition(ctx, preq) + assert.Nil(t, err) + assert.Equal(t, stb.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + assert.Equal(t, stb.Value, false) + + st, err := proxyClient.CreatePartition(ctx, preq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + stb, err = proxyClient.HasPartition(ctx, preq) + assert.Nil(t, err) + assert.Equal(t, stb.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + assert.Equal(t, stb.Value, true) + + std, err := proxyClient.DescribePartition(ctx, preq) + assert.Nil(t, err) + assert.Equal(t, std.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + + sts, err := proxyClient.ShowPartitions(ctx, &servicepb.CollectionName{CollectionName: collName}) + assert.Nil(t, err) + assert.Equal(t, sts.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + assert.True(t, len(sts.Values) >= 1) + assert.True(t, len(sts.Values) <= testNum) + + st, err = proxyClient.DropPartition(ctx, preq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + }() + } + wg.Wait() +} + func TestMain(m *testing.M) { setup() code := m.Run() diff --git a/internal/proxy/task.go b/internal/proxy/task.go index a4660ecfec..1aaa00366d 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -449,3 +449,208 @@ func (sct *ShowCollectionsTask) Execute() error { func (sct *ShowCollectionsTask) PostExecute() error { return nil } + +type CreatePartitionTask struct { + Condition + internalpb.CreatePartitionRequest + masterClient masterpb.MasterClient + result *commonpb.Status + ctx context.Context +} + +func (cpt *CreatePartitionTask) ID() UniqueID { + return cpt.ReqID +} + +func (cpt *CreatePartitionTask) Type() internalpb.MsgType { + return cpt.MsgType +} + +func (cpt *CreatePartitionTask) BeginTs() Timestamp { + return cpt.Timestamp +} + +func (cpt *CreatePartitionTask) EndTs() Timestamp { + return cpt.Timestamp +} + +func (cpt *CreatePartitionTask) SetTs(ts Timestamp) { + cpt.Timestamp = ts +} + +func (cpt *CreatePartitionTask) PreExecute() error { + return nil +} + +func (cpt *CreatePartitionTask) Execute() (err error) { + cpt.result, err = cpt.masterClient.CreatePartition(cpt.ctx, &cpt.CreatePartitionRequest) + return err +} + +func (cpt *CreatePartitionTask) PostExecute() error { + return nil +} + +type DropPartitionTask struct { + Condition + internalpb.DropPartitionRequest + masterClient masterpb.MasterClient + result *commonpb.Status + ctx context.Context +} + +func (dpt *DropPartitionTask) ID() UniqueID { + return dpt.ReqID +} + +func (dpt *DropPartitionTask) Type() internalpb.MsgType { + return dpt.MsgType +} + +func (dpt *DropPartitionTask) BeginTs() Timestamp { + return dpt.Timestamp +} + +func (dpt *DropPartitionTask) EndTs() Timestamp { + return dpt.Timestamp +} + +func (dpt *DropPartitionTask) SetTs(ts Timestamp) { + dpt.Timestamp = ts +} + +func (dpt *DropPartitionTask) PreExecute() error { + return nil +} + +func (dpt *DropPartitionTask) Execute() (err error) { + dpt.result, err = dpt.masterClient.DropPartition(dpt.ctx, &dpt.DropPartitionRequest) + return err +} + +func (dpt *DropPartitionTask) PostExecute() error { + return nil +} + +type HasPartitionTask struct { + Condition + internalpb.HasPartitionRequest + masterClient masterpb.MasterClient + result *servicepb.BoolResponse + ctx context.Context +} + +func (hpt *HasPartitionTask) ID() UniqueID { + return hpt.ReqID +} + +func (hpt *HasPartitionTask) Type() internalpb.MsgType { + return hpt.MsgType +} + +func (hpt *HasPartitionTask) BeginTs() Timestamp { + return hpt.Timestamp +} + +func (hpt *HasPartitionTask) EndTs() Timestamp { + return hpt.Timestamp +} + +func (hpt *HasPartitionTask) SetTs(ts Timestamp) { + hpt.Timestamp = ts +} + +func (hpt *HasPartitionTask) PreExecute() error { + return nil +} + +func (hpt *HasPartitionTask) Execute() (err error) { + hpt.result, err = hpt.masterClient.HasPartition(hpt.ctx, &hpt.HasPartitionRequest) + return err +} + +func (hpt *HasPartitionTask) PostExecute() error { + return nil +} + +type DescribePartitionTask struct { + Condition + internalpb.DescribePartitionRequest + masterClient masterpb.MasterClient + result *servicepb.PartitionDescription + ctx context.Context +} + +func (dpt *DescribePartitionTask) ID() UniqueID { + return dpt.ReqID +} + +func (dpt *DescribePartitionTask) Type() internalpb.MsgType { + return dpt.MsgType +} + +func (dpt *DescribePartitionTask) BeginTs() Timestamp { + return dpt.Timestamp +} + +func (dpt *DescribePartitionTask) EndTs() Timestamp { + return dpt.Timestamp +} + +func (dpt *DescribePartitionTask) SetTs(ts Timestamp) { + dpt.Timestamp = ts +} + +func (dpt *DescribePartitionTask) PreExecute() error { + return nil +} + +func (dpt *DescribePartitionTask) Execute() (err error) { + dpt.result, err = dpt.masterClient.DescribePartition(dpt.ctx, &dpt.DescribePartitionRequest) + return err +} + +func (dpt *DescribePartitionTask) PostExecute() error { + return nil +} + +type ShowPartitionsTask struct { + Condition + internalpb.ShowPartitionRequest + masterClient masterpb.MasterClient + result *servicepb.StringListResponse + ctx context.Context +} + +func (spt *ShowPartitionsTask) ID() UniqueID { + return spt.ReqID +} + +func (spt *ShowPartitionsTask) Type() internalpb.MsgType { + return spt.MsgType +} + +func (spt *ShowPartitionsTask) BeginTs() Timestamp { + return spt.Timestamp +} + +func (spt *ShowPartitionsTask) EndTs() Timestamp { + return spt.Timestamp +} + +func (spt *ShowPartitionsTask) SetTs(ts Timestamp) { + spt.Timestamp = ts +} + +func (spt *ShowPartitionsTask) PreExecute() error { + return nil +} + +func (spt *ShowPartitionsTask) Execute() (err error) { + spt.result, err = spt.masterClient.ShowPartitions(spt.ctx, &spt.ShowPartitionRequest) + return err +} + +func (spt *ShowPartitionsTask) PostExecute() error { + return nil +} diff --git a/internal/reader/data_sync_service_test.go b/internal/reader/data_sync_service_test.go index b5e9fe07d2..689607460d 100644 --- a/internal/reader/data_sync_service_test.go +++ b/internal/reader/data_sync_service_test.go @@ -18,7 +18,7 @@ import ( ) // NOTE: start pulsar before test -func TestManipulationService_Start(t *testing.T) { +func TestDataSyncService_Start(t *testing.T) { Params.Init() var ctx context.Context @@ -155,6 +155,24 @@ func TestManipulationService_Start(t *testing.T) { Msgs: insertMessages, } + // generate timeTick + timeTickMsgPack := msgstream.MsgPack{} + baseMsg := msgstream.BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []int32{0}, + } + timeTickResult := internalPb.TimeTickMsg{ + MsgType: internalPb.MsgType_kTimeTick, + PeerID: UniqueID(0), + Timestamp: math.MaxUint64, + } + timeTickMsg := &msgstream.TimeTickMsg{ + BaseMsg: baseMsg, + TimeTickMsg: timeTickResult, + } + timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) + // pulsar produce const receiveBufSize = 1024 producerChannels := []string{"insert"} @@ -165,9 +183,13 @@ func TestManipulationService_Start(t *testing.T) { var insertMsgStream msgstream.MsgStream = insertStream insertMsgStream.Start() + err = insertMsgStream.Produce(&msgPack) assert.NoError(t, err) + err = insertMsgStream.Broadcast(&timeTickMsgPack) + assert.NoError(t, err) + // dataSync node.dataSyncService = newDataSyncService(node.ctx, node.replica) go node.dataSyncService.start() diff --git a/internal/reader/flow_graph_msg_stream_input_nodes.go b/internal/reader/flow_graph_msg_stream_input_nodes.go index 2ebbd6cd6d..b5ee6a581b 100644 --- a/internal/reader/flow_graph_msg_stream_input_nodes.go +++ b/internal/reader/flow_graph_msg_stream_input_nodes.go @@ -20,7 +20,7 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { consumeChannels := []string{"insert"} consumeSubName := "insertSub" - insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) + insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize) insertStream.SetPulsarClient(msgStreamURL) unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index 122d27ef49..a98f69f68a 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -63,7 +63,7 @@ func (node *QueryNode) Start() { node.statsService = newStatsService(node.ctx, node.replica) go node.dataSyncService.start() - // go node.searchService.start() + go node.searchService.start() go node.metaService.start() node.statsService.start() }