refactor proto logic

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2020-09-06 21:13:06 +08:00 committed by yefu.chen
parent d58f6ff817
commit 8992611898
46 changed files with 3350 additions and 6157 deletions

2349
pkg/message/message.pb.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,7 @@
syntax = "proto3";
package suvlim.grpc;
package milvus.grpc;
option go_package="msgpb";
enum ErrorCode {
SUCCESS = 0;
@ -708,4 +709,4 @@ message TimeSyncMsg{
message Key2SegMsg {
int64 uid = 1;
repeated int64 segment_id = 2;
}
}

View File

@ -1,711 +0,0 @@
syntax = "proto3";
package milvus.grpc;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
//////////////////////////row schema and data///////////////////////////////////
/**
* @brief schema
*/
message FieldMeta {
string field_name = 1;
DataType type = 2;
int64 dim = 3;
}
message Schema {
repeated FieldMeta field_metas = 1;
}
message RowData {
bytes blob = 1;
}
//////////////////////suvlim-proxy///////////////////////////////////
message InsertParam {
string collection_name = 1;
Schema schema = 2;
repeated RowData rows_data = 3;
repeated int64 entity_id_array = 4; //optional
string partition_tag = 5;
repeated KeyValuePair extra_params = 6;
}
message SearchParam {
string collection_name = 1;
repeated VectorParam vector_param = 2;
string dsl = 3; //optional
repeated string partition_tag = 4; //why
repeated KeyValuePair extra_params = 5;
}
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated RowData rows_data = 4;
}
///////////////////////////milvus-server///////////////////////////
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}
////////////////////pulsar//////////////////////////////////////
enum OpType {
INSERT = 0;
DELETE = 1;
}
message InsertOrDeleteMsg {
string collection_name = 1;
RowData rows_data = 2;
int64 uid = 3; //optional
string partition_tag = 4;
int64 timestamp =5;
int64 segment_id = 6;
int64 channel_id = 7;
OpType op = 8;
int64 client_id = 9;
repeated KeyValuePair extra_params = 10;
}
message SearchMsg {
string collection_name = 1;
VectorRowRecord records = 2;
string partition_tag = 3;
int64 uid = 4;
int64 timestamp =5;
int64 client_id = 6;
repeated KeyValuePair extra_params = 7;
}
enum SyncType {
READ = 0;
WRITE = 1;
}
message TimeSyncMsg{
int64 peer_Id = 1;
int64 Timestamp = 2;
SyncType sync_type = 3;
}
message Key2SegMsg {
int64 uid = 1;
repeated int64 segment_id = 2;
}

View File

@ -16,7 +16,7 @@ include_directories(${MILVUS_ENGINE_SRC})
include_directories(${MILVUS_THIRDPARTY_SRC})
#include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-status)
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus)
include_directories(${MILVUS_ENGINE_SRC}/grpc)
add_subdirectory( tracing )
add_subdirectory( utils )

View File

@ -1,32 +0,0 @@
#!/bin/bash
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-status --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" status.proto
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-status status.proto
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" milvus.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus milvus.proto
#
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" hello.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus hello.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" master.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus master.proto
# TODO: spilt milvus and pulsar proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" master.proto
#
protoc=../../cmake_build/thirdparty/grpc/grpc-build/third_party/protobuf/protoc
$protoc -I . --cpp_out=./gen-milvus suvlim.proto
$protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake_build/thirdparty/grpc/grpc-build/grpc_cpp_plugin" suvlim.proto
$protoc -I . --cpp_out=./gen-milvus hello.proto
$protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake_build/thirdparty/grpc/grpc-build/grpc_cpp_plugin" hello.proto
$protoc -I . --cpp_out=./gen-milvus master.proto
$protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake_build/thirdparty/grpc/grpc-build/grpc_cpp_plugin" master.proto

View File

@ -1,9 +1,9 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: suvlim.proto
// source: message.proto
#include "suvlim.pb.h"
#include "suvlim.grpc.pb.h"
#include "message.pb.h"
#include "message.grpc.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_stream.h>

View File

@ -1,10 +1,10 @@
// Generated by the gRPC C++ plugin.
// If you make any local change, they will be lost.
// source: suvlim.proto
#ifndef GRPC_suvlim_2eproto__INCLUDED
#define GRPC_suvlim_2eproto__INCLUDED
// source: message.proto
#ifndef GRPC_message_2eproto__INCLUDED
#define GRPC_message_2eproto__INCLUDED
#include "suvlim.pb.h"
#include "message.pb.h"
#include <functional>
#include <grpcpp/impl/codegen/async_generic_service.h>
@ -4553,4 +4553,4 @@ class MilvusService final {
} // namespace milvus
#endif // GRPC_suvlim_2eproto__INCLUDED
#endif // GRPC_message_2eproto__INCLUDED

View File

@ -1,717 +0,0 @@
syntax = "proto3";
package milvus.grpc;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
Schema schema = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
//////////////////////////row schema and data///////////////////////////////////
/**
* @brief schema
*/
message FieldMeta {
string field_name = 1;
DataType type = 2;
int64 dim = 3;
}
message Schema {
repeated FieldMeta field_metas = 1;
}
message RowData {
bytes blob = 1;
}
//////////////////////suvlim-proxy///////////////////////////////////
message InsertParam {
string collection_name = 1;
Schema schema = 2;
repeated RowData rows_data = 3;
repeated int64 entity_id_array = 4; //optional
string partition_tag = 5;
repeated KeyValuePair extra_params = 6;
}
message SearchParam {
string collection_name = 1;
repeated VectorParam vector_param = 2;
string dsl = 3; //optional
repeated string partition_tag = 4; //why
repeated KeyValuePair extra_params = 5;
}
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated RowData rows_data = 4;
}
///////////////////////////milvus-server///////////////////////////
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}
////////////////////pulsar//////////////////////////////////////
enum OpType {
INSERT = 0;
DELETE = 1;
}
message InsertOrDeleteMsg {
string collection_name = 1;
RowData rows_data = 2;
int64 uid = 3; //optional
string partition_tag = 4;
uint64 timestamp =5;
int64 segment_id = 6;
int64 channel_id = 7;
OpType op = 8;
int64 client_id = 9;
repeated KeyValuePair extra_params = 10;
}
message SearchMsg {
string collection_name = 1;
VectorRowRecord records = 2;
string partition_tag = 3;
int64 uid = 4;
uint64 timestamp =5;
int64 client_id = 6;
repeated KeyValuePair extra_params = 7;
}
enum SyncType {
READ = 0;
WRITE = 1;
}
message TimeSyncMsg{
int64 peer_Id = 1;
uint64 Timestamp = 2;
SyncType sync_type = 3;
}
message SegmentRecord {
int64 uid = 1;
repeated int64 segment_id = 2;
}
message Key2SegMsg {
int64 client_id = 1;
SegmentRecord records = 2;
}

View File

@ -3,7 +3,7 @@
#include "utils/Status.h"
#include "Producer.h"
#include "Consumer.h"
#include "grpc/gen-milvus/suvlim.pb.h"
#include "grpc/message.pb.h"
namespace milvus::message_client {
class MsgClientV2 {

View File

@ -1,6 +1,6 @@
#include "Consumer.h"
#include "grpc/gen-milvus/suvlim.pb.h"
#include "grpc/message.pb.h"
namespace milvus {
namespace message_client {

View File

@ -2,7 +2,7 @@
#include "pulsar/Consumer.h"
#include "Client.h"
#include "grpc/gen-milvus/suvlim.pb.h"
#include "grpc/message.pb.h"
namespace milvus {
namespace message_client {

View File

@ -2,7 +2,7 @@
#include "pulsar/Producer.h"
#include "Client.h"
#include "grpc/gen-milvus/suvlim.pb.h"
#include "grpc/message.pb.h"
namespace milvus {
namespace message_client {

View File

@ -11,14 +11,12 @@
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
set( GRPC_SERVICE_FILES ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/suvlim.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/suvlim.pb.cc
#${MILVUS_ENGINE_SRC}/grpc/gen-status/status.grpc.pb.cc
#${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/hello.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/hello.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/master.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/master.pb.cc
set( GRPC_SERVICE_FILES ${MILVUS_ENGINE_SRC}/grpc/message.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/message.pb.cc
${MILVUS_ENGINE_SRC}/grpc/hello.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/hello.pb.cc
${MILVUS_ENGINE_SRC}/grpc/master.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/master.pb.cc
)
aux_source_directory( ${MILVUS_ENGINE_SRC}/server SERVER_SERVICE_FILES )

View File

@ -12,7 +12,7 @@
#pragma once
#include "db/Types.h"
#include "grpc/gen-milvus/suvlim.grpc.pb.h"
#include "grpc/message.grpc.pb.h"
#include "query/GeneralQuery.h"
#include "utils/Json.h"
#include "utils/Status.h"

View File

@ -20,7 +20,7 @@
#include <string>
#include <unordered_map>
#include "query/BinaryQuery.h"
#include "grpc/gen-milvus/suvlim.grpc.pb.h"
#include "grpc/message.grpc.pb.h"
#include "opentracing/tracer.h"
#include "server/context/Context.h"
#include "server/delivery/ReqHandler.h"

View File

@ -2,7 +2,7 @@
#define MILVUS_HELLOSERVICE_H
#include "grpc++/grpc++.h"
#include <src/grpc/gen-milvus/hello.grpc.pb.h>
#include <src/grpc/hello.grpc.pb.h>
class HelloService final : public ::milvus::grpc::HelloService::Service{
::grpc::Status SayHello

View File

@ -1,7 +1,7 @@
#pragma once
#include "grpc++/grpc++.h"
#include "src/grpc/gen-milvus/master.grpc.pb.h"
#include "src/grpc/master.grpc.pb.h"
class ReportClient {
public:

View File

@ -15,7 +15,7 @@
#include <cstdint>
#include <thread>
#include <string>
#include "suvlim.pb.h"
#include "grpc/message.pb.h"
namespace milvus {
namespace timesync {

View File

@ -57,49 +57,3 @@ message( STATUS "grpc src compile options: ${var}" )
set( PROTOC_EXCUTABLE $<TARGET_FILE:protoc> )
set( GRPC_CPP_PLUGIN_EXCUTABLE $<TARGET_FILE:grpc_cpp_plugin> )
set( PROTO_PATH "${MILVUS_SOURCE_DIR}/src/grpc" )
# Proto file
get_filename_component( milvus_proto "${PROTO_PATH}/milvus.proto" ABSOLUTE )
get_filename_component( milvus_proto_path "${PROTO_PATH}" PATH )
get_filename_component( status_proto "${PROTO_PATH}/status.proto" ABSOLUTE )
get_filename_component( status_proto_path "${PROTO_PATH}" PATH )
# Generated sources
set( milvus_proto_srcs "${PROTO_PATH}/gen-milvus-test/helloworld.pb.cc" )
set( milvus_proto_hdrs "${PROTO_PATH}/gen-milvus-test/helloworld.pb.h" )
set( milvus_grpc_srcs "${PROTO_PATH}/gen-milvus-test/helloworld.grpc.pb.cc" )
set( milvus_grpc_hdrs "${PROTO_PATH}/gen-milvus-test/helloworld.grpc.pb.h" )
set( status_proto_srcs "${PROTO_PATH}/gen-status-test/helloworld.pb.cc" )
set( status_proto_hdrs "${PROTO_PATH}/gen-status-test/helloworld.pb.h" )
set( status_grpc_srcs "${PROTO_PATH}/gen-status-test/helloworld.grpc.pb.cc" )
set( status_grpc_hdrs "${PROTO_PATH}/gen-status-test/helloworld.grpc.pb.h" )
add_custom_command(
OUTPUT "${milvus_proto_srcs}"
"${milvus_proto_hdrs}"
"${milvus_grpc_srcs}"
"${milvus_grpc_hdrs}"
COMMAND ${PROTOC_EXCUTABLE}
ARGS --grpc_out "${PROTO_PATH}/gen-milvus-test"
--cpp_out "${PROTO_PATH}/gen-milvus-test"
-I "${milvus_proto_path}"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXCUTABLE}"
"${milvus_proto}"
DEPENDS "${milvus_proto}" )
add_custom_command(
OUTPUT "${status_proto_srcs}" "${status_proto_hdrs}" "${status_grpc_srcs}" "${status_grpc_hdrs}"
COMMAND ${PROTOC_EXCUTABLE}
ARGS --grpc_out "${PROTO_PATH}/gen-status-test"
--cpp_out "${PROTO_PATH}/gen-status-test"
-I "${status_proto_path}"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN_EXCUTABLE}"
"${status_proto}"
DEPENDS "${status_proto}" )
# Include generated *.pb.h files
include_directories( "${MILVUS_SOURCE_DIR}/gen-milvus-test")
include_directories( "${MILVUS_SOURCE_DIR}/gen-status-test" )

View File

@ -17,7 +17,7 @@ include_directories(${MILVUS_ENGINE_SRC})
include_directories(${MILVUS_THIRDPARTY_SRC})
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus)
include_directories(${MILVUS_ENGINE_SRC}/grpc)
set( ENTRY_FILE ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp )

View File

@ -1,13 +1,7 @@
enable_testing()
set( GRPC_SERVICE_FILES
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/suvlim.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/suvlim.pb.cc
# ${MILVUS_ENGINE_SRC}/grpc/gen-status/status.grpc.pb.cc
# ${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc
# ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/hello.grpc.pb.cc
# ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/hello.pb.cc
# ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/master.grpc.pb.cc
# ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/master.pb.cc
${MILVUS_ENGINE_SRC}/grpc/message.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/message.pb.cc
)

View File

@ -1,6 +1,6 @@
#include <gtest/gtest.h>
#include "message_client/Consumer.h"
#include "grpc/gen-milvus/suvlim.pb.h"
#include "grpc/message.pb.h"
TEST(CLIENT_CPP, CONSUMER) {
auto client= std::make_shared<milvus::message_client::MsgClient>("pulsar://localhost:6650");

View File

@ -1,6 +1,6 @@
#include <gtest/gtest.h>
#include "message_client/Producer.h"
#include "grpc/gen-milvus/suvlim.pb.h"
#include "grpc/message.pb.h"
TEST(CLIENT_CPP, Producer) {
auto client= std::make_shared<milvus::message_client::MsgClient>("pulsar://localhost:6650");

View File

@ -1,6 +1,6 @@
#include "thread"
#include "pulsar/Client.h"
#include <src/grpc/gen-milvus/hello.grpc.pb.h>
#include <src/grpc/hello.grpc.pb.h>
using namespace pulsar;
using MyData = milvus::grpc::PMessage;

16
scripts/check.sh Executable file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
check_protoc_version() {
version=$(protoc --version)
major=$(echo ${version} | sed -n -e 's/.*\([0-9]\{1,\}\)\.[0-9]\{1,\}\.[0-9]\{1,\}.*/\1/p')
minor=$(echo ${version} | sed -n -e 's/.*[0-9]\{1,\}\.\([0-9]\{1,\}\)\.[0-9]\{1,\}.*/\1/p')
if [ "$major" -eq 3 ] && [ "$minor" -eq 8 ]; then
return 0
fi
echo "protoc version not match, version 3.8.x is needed, current version: ${version}"
return 0
}
if ! check_protoc_version; then
exit 1
fi

18
scripts/common.sh Normal file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
function push() {
pushd $1 >/dev/null 2>&1
}
function pop() {
popd $1 >/dev/null 2>&1
}
function sed_inplace()
{
if [ `uname` == "Darwin" ]; then
sed -i '' "$@"
else
sed -i "$@"
fi
}

52
scripts/generate_cpp.sh Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env bash
SCRIPTS_DIR=$(dirname "$0")
ROOT_DIR="$SCRIPTS_DIR/.."
source $SCRIPTS_DIR/common.sh
#protoc=protoc
protoc=${ROOT_DIR}/proxy/cmake_build/thirdparty/grpc/grpc-build/third_party/protobuf/protoc
grpc_cpp_plugin=${ROOT_DIR}/proxy/cmake_build/thirdparty/grpc/grpc-build/grpc_cpp_plugin
echo "generate cpp code..."
OUTDIR=${ROOT_DIR}/proxy/src/grpc
GRPC_INCLUDE=.:.
#GRPC_INCLUDE=.:../include
cd $ROOT_DIR
rm -rf proto-cpp && mkdir -p proto-cpp
PB_FILES=()
GRPC_FILES=("hello.proto" "master.proto" "message.proto")
ALL_FILES=("${PB_FILES[@]}")
ALL_FILES+=("${GRPC_FILES[@]}")
for file in ${ALL_FILES[@]}
do
cp proto/$file proto-cpp/
done
push proto-cpp
#mkdir -p ../pkg/cpp
for file in ${PB_FILES[@]}
do
echo $file
$protoc -I${GRPC_INCLUDE} --cpp_out $OUTDIR *.proto || exit $?
done
for file in ${GRPC_FILES[@]}
do
echo $file
$protoc -I${GRPC_INCLUDE} --cpp_out $OUTDIR *.proto || exit $?
$protoc -I${GRPC_INCLUDE} --grpc_out $OUTDIR --plugin=protoc-gen-grpc=${grpc_cpp_plugin} *.proto || exit $?
done
pop
rm -rf proto-cpp

86
scripts/generate_go.sh Executable file
View File

@ -0,0 +1,86 @@
#!/usr/bin/env bash
SCRIPTS_DIR=$(dirname "$0")
ROOT_DIR=$SCRIPTS_DIR/..
source $SCRIPTS_DIR/common.sh
#protoc=protoc
protoc=${ROOT_DIR}/proxy/cmake_build/thirdparty/grpc/grpc-build/third_party/protobuf/protoc
push $SCRIPTS_DIR/..
KVPROTO_ROOT=`pwd`
pop
PROGRAM=$(basename "$0")
GOPATH=$(go env GOPATH)
if [ -z $GOPATH ]; then
printf "Error: the environment variable GOPATH is not set, please set it before running %s\n" $PROGRAM > /dev/stderr
exit 1
fi
GO_PREFIX_PATH=github.com/czs007/suvlim/pkg
#export PATH=$KVPROTO_ROOT/_tools/bin:$GOPATH/bin:$PATH
function collect() {
file=$(basename $1)
base_name=$(basename $file ".proto")
mkdir -p ../pkg/$base_name
if [ -z $GO_OUT_M ]; then
GO_OUT_M="M$file=$GO_PREFIX_PATH/$base_name"
else
GO_OUT_M="$GO_OUT_M,M$file=$GO_PREFIX_PATH/$base_name"
fi
}
# Although eraftpb.proto is copying from raft-rs, however there is no
# official go code ship with the crate, so we need to generate it manually.
cd ../proto
PB_FILES=("message.proto")
GRPC_FILES=("pdpb.proto" "metapb.proto")
ALL_FILES=("${PB_FILES[@]}")
ALL_FILES+=("${GRPC_FILES[@]}")
for file in ${ALL_FILES[@]}
do
collect $file
done
ret=0
function replace(){
cd ../pkg/$base_name
sed_inplace -E 's/import fmt \"fmt\"//g' *.pb*.go
sed_inplace -E 's/import io \"io\"//g' *.pb*.go
sed_inplace -E 's/import math \"math\"//g' *.pb*.go
#goimports -w *.pb*.go
cd ../../proto
}
function gen_pb() {
base_name=$(basename $1 ".proto")
$protoc -I.:. --go_out=plugins=$GO_OUT_M:../pkg/$base_name $1 || ret=$?
#replace
}
function gen_grpc() {
base_name=$(basename $1 ".proto")
$protoc -I.:. --go_out=plugins=grpc,$GO_OUT_M:../pkg/${base_name} $1 || ret=$?
#replace
}
for file in ${PB_FILES[@]}
do
echo $file
gen_pb $file
done
for file in ${GRPC_FILES[@]}
do
echo $file
gen_grpc $file
done
exit $ret

View File

@ -3,7 +3,7 @@ package message_client
import (
"context"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/writer/pb"
msgpb "github.com/czs007/suvlim/pkg/message"
"github.com/golang/protobuf/proto"
"log"
)
@ -11,9 +11,9 @@ import (
type MessageClient struct {
//message channel
insertOrDeleteChan chan *pb.InsertOrDeleteMsg
searchByIdChan chan *pb.EntityIdentity
timeSyncChan chan *pb.TimeSyncMsg
insertOrDeleteChan chan *msgpb.InsertOrDeleteMsg
searchByIdChan chan *msgpb.EntityIdentity
timeSyncChan chan *msgpb.TimeSyncMsg
// pulsar
client pulsar.Client
@ -23,13 +23,13 @@ type MessageClient struct {
timeSyncConsumer pulsar.Consumer
// batch messages
InsertMsg []*pb.InsertOrDeleteMsg
DeleteMsg []*pb.InsertOrDeleteMsg
SearchByIdMsg []*pb.EntityIdentity
TimeSyncMsg []*pb.TimeSyncMsg
InsertMsg []*msgpb.InsertOrDeleteMsg
DeleteMsg []*msgpb.InsertOrDeleteMsg
SearchByIdMsg []*msgpb.EntityIdentity
TimeSyncMsg []*msgpb.TimeSyncMsg
}
func (mc *MessageClient) Send(ctx context.Context, msg pb.Key2SegMsg) {
func (mc *MessageClient) Send(ctx context.Context, msg msgpb.Key2SegMsg) {
if err := mc.key2segProducer.Send(ctx, pulsar.ProducerMessage{
Payload: []byte(msg.String()),
}); err != nil {
@ -39,7 +39,7 @@ func (mc *MessageClient) Send(ctx context.Context, msg pb.Key2SegMsg) {
func (mc *MessageClient) ReceiveInsertOrDeleteMsg() {
for {
insetOrDeleteMsg := pb.InsertOrDeleteMsg{}
insetOrDeleteMsg := msgpb.InsertOrDeleteMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &insetOrDeleteMsg)
if err != nil {
@ -52,7 +52,7 @@ func (mc *MessageClient) ReceiveInsertOrDeleteMsg() {
func (mc *MessageClient) ReceiveSearchByIdMsg() {
for {
searchByIdMsg := pb.EntityIdentity{}
searchByIdMsg := msgpb.EntityIdentity{}
msg, err := mc.searchByIdConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &searchByIdMsg)
if err != nil {
@ -65,7 +65,7 @@ func (mc *MessageClient) ReceiveSearchByIdMsg() {
func (mc *MessageClient) ReceiveTimeSyncMsg() {
for {
timeSyncMsg := pb.TimeSyncMsg{}
timeSyncMsg := msgpb.TimeSyncMsg{}
msg, err := mc.timeSyncConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &timeSyncMsg)
if err != nil {
@ -130,14 +130,14 @@ func (mc *MessageClient) InitClient(url string) {
mc.timeSyncConsumer = mc.CreateConsumer("TimeSync")
// init channel
mc.insertOrDeleteChan = make(chan *pb.InsertOrDeleteMsg, 1000)
mc.searchByIdChan = make(chan *pb.EntityIdentity, 1000)
mc.timeSyncChan = make(chan *pb.TimeSyncMsg, 1000)
mc.insertOrDeleteChan = make(chan *msgpb.InsertOrDeleteMsg, 1000)
mc.searchByIdChan = make(chan *msgpb.EntityIdentity, 1000)
mc.timeSyncChan = make(chan *msgpb.TimeSyncMsg, 1000)
mc.InsertMsg = make([]*pb.InsertOrDeleteMsg, 1000)
mc.DeleteMsg = make([]*pb.InsertOrDeleteMsg, 1000)
mc.SearchByIdMsg = make([]*pb.EntityIdentity, 1000)
mc.TimeSyncMsg = make([]*pb.TimeSyncMsg, 1000)
mc.InsertMsg = make([]*msgpb.InsertOrDeleteMsg, 1000)
mc.DeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 1000)
mc.SearchByIdMsg = make([]*msgpb.EntityIdentity, 1000)
mc.TimeSyncMsg = make([]*msgpb.TimeSyncMsg, 1000)
}
func (mc *MessageClient) Close() {
@ -171,7 +171,7 @@ func (mc *MessageClient) PrepareMsg(messageType MessageType, msgLen int) {
case InsertOrDelete:
for i := 0; i < msgLen; i++ {
msg := <-mc.insertOrDeleteChan
if msg.Op == pb.OpType_INSERT {
if msg.Op == msgpb.OpType_INSERT {
mc.InsertMsg = append(mc.InsertMsg, msg)
} else {
mc.DeleteMsg = append(mc.DeleteMsg, msg)

View File

@ -1,4 +0,0 @@
#!/usr/bin/env bash
pkg=pb
protoc --go_out=import_path=${pkg}:. suvlim.proto

File diff suppressed because it is too large Load Diff

View File

@ -2,14 +2,14 @@ package test
import (
"context"
"github.com/czs007/suvlim/writer/pb"
msgpb "github.com/czs007/suvlim/pkg/message"
"github.com/czs007/suvlim/writer/write_node"
"sync"
"testing"
)
func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *pb.InsertOrDeleteMsg {
return &pb.InsertOrDeleteMsg{
func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *msgpb.InsertOrDeleteMsg {
return &msgpb.InsertOrDeleteMsg{
CollectionName: collectionName,
PartitionTag: partitionTag,
SegmentId: int64(entityId / 100),
@ -19,8 +19,8 @@ func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *p
}
}
func GetDeleteMsg(collectionName string, entityId int64) *pb.InsertOrDeleteMsg {
return &pb.InsertOrDeleteMsg{
func GetDeleteMsg(collectionName string, entityId int64) *msgpb.InsertOrDeleteMsg {
return &msgpb.InsertOrDeleteMsg{
CollectionName: collectionName,
Uid: entityId,
Timestamp: int64(entityId + 100),
@ -33,7 +33,7 @@ func TestInsert(t *testing.T) {
topics = append(topics, "test")
topics = append(topics, "test1")
writerNode, _ := write_node.NewWriteNode(ctx, "null", topics, 0)
var insertMsgs []*pb.InsertOrDeleteMsg
var insertMsgs []*msgpb.InsertOrDeleteMsg
for i := 0; i < 120; i++ {
insertMsgs = append(insertMsgs, GetInsertMsg("collection0", "tag01", int64(i)))
}
@ -41,12 +41,12 @@ func TestInsert(t *testing.T) {
wg.Add(3)
//var wg sync.WaitGroup
writerNode.InsertBatchData(ctx, insertMsgs, &wg)
var insertMsgs2 []*pb.InsertOrDeleteMsg
var insertMsgs2 []*msgpb.InsertOrDeleteMsg
for i := 120; i < 200; i++ {
insertMsgs2 = append(insertMsgs2, GetInsertMsg("collection0", "tag02", int64(i)))
}
writerNode.InsertBatchData(ctx, insertMsgs2, &wg)
var deleteMsgs []*pb.InsertOrDeleteMsg
var deleteMsgs []*msgpb.InsertOrDeleteMsg
deleteMsgs = append(deleteMsgs, GetDeleteMsg("collection0", 2))
deleteMsgs = append(deleteMsgs, GetDeleteMsg("collection0", 120))
writerNode.DeleteBatchData(ctx, deleteMsgs, &wg)

View File

@ -3,7 +3,7 @@ package test
import (
"context"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/writer/pb"
msgpb "github.com/czs007/suvlim/pkg/message"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"log"
@ -24,7 +24,7 @@ func TestKey2Seg(t *testing.T) {
SubscriptionName: "sub-1",
})
obj := pb.Key2SegMsg{}
obj := msgpb.Key2SegMsg{}
msg, err := consumer.Receive(context.Background())
proto.Unmarshal(msg.Payload(), &obj)
assert.Equal(t, obj.Uid, int64(0))

View File

@ -6,7 +6,7 @@ import (
storage "github.com/czs007/suvlim/storage/pkg"
"github.com/czs007/suvlim/storage/pkg/types"
"github.com/czs007/suvlim/writer/message_client"
"github.com/czs007/suvlim/writer/pb"
msgpb "github.com/czs007/suvlim/pkg/message"
"strconv"
"sync"
)
@ -36,7 +36,7 @@ func NewWriteNode(ctx context.Context,
}, err
}
func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*pb.InsertOrDeleteMsg, wg *sync.WaitGroup) error {
func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*msgpb.InsertOrDeleteMsg, wg *sync.WaitGroup) error {
var prefixKey string
var suffixKey string
var prefixKeys [][]byte
@ -63,7 +63,7 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*pb.InsertOrDel
return nil
}
func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*pb.InsertOrDeleteMsg, wg *sync.WaitGroup) error {
func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOrDeleteMsg, wg *sync.WaitGroup) error {
//var segmentInfos []*SegmentIdInfo
var prefixKey string
var prefixKeys [][]byte
@ -84,7 +84,7 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*pb.InsertOrDel
segmentIds = append(segmentIds, id)
}
segmentInfo := pb.Key2SegMsg{
segmentInfo := msgpb.Key2SegMsg{
Uid: data[i].Uid,
SegmentId: segmentIds,
}