Add pulsar cpp client

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2020-09-01 20:50:12 +08:00 committed by yefu.chen
parent 558a81dcbe
commit 29d6a3ad0a
34 changed files with 9497 additions and 240 deletions

4
.gitignore vendored
View File

@ -10,6 +10,10 @@ core/build/*
core/.idea/
.idea/
.idea/*
pulsar/client-cpp/cmake-build-debug/
pulsar/client-cpp/cmake-build-debug/*
pulsar/client-cpp/build/
pulsar/client-cpp/build/*
# vscode generated files
.vscode

View File

@ -129,13 +129,6 @@ func (c *baseClient) leaderLoop() {
}
}
// ScheduleCheckLeader is used to check leader.
func (c *baseClient) ScheduleCheckLeader() {
select {
case c.checkLeaderCh <- struct{}{}:
default:
}
}
// GetClusterID returns the ClusterID.
func (c *baseClient) GetClusterID(context.Context) uint64 {
@ -173,28 +166,6 @@ func (c *baseClient) updateURLs(members []*pdpb.Member) {
c.urls = urls
}
func (c *baseClient) switchLeader(addrs []string) error {
// FIXME: How to safely compare leader urls? For now, only allows one client url.
addr := addrs[0]
c.connMu.RLock()
oldLeader := c.connMu.leader
c.connMu.RUnlock()
if addr == oldLeader {
return nil
}
log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader))
if _, err := c.getOrCreateGRPCConn(addr); err != nil {
return err
}
c.connMu.Lock()
defer c.connMu.Unlock()
c.connMu.leader = addr
return nil
}
func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
c.connMu.RLock()

View File

@ -178,7 +178,6 @@ func (c *client) tsLoop() {
default:
}
log.Error("[pd] create tso stream error")
c.ScheduleCheckLeader()
cancel()
c.revokeTSORequest(errors.WithStack(err))
select {
@ -225,7 +224,6 @@ func (c *client) tsLoop() {
default:
}
log.Error("[pd] getTS error")
c.ScheduleCheckLeader()
cancel()
stream, cancel = nil, nil
}

View File

@ -18,7 +18,6 @@ import (
"regexp"
"github.com/czs007/suvlim/errors"
"github.com/czs007/suvlim/pkg/metapb"
)
const (
@ -38,19 +37,6 @@ func validateFormat(s, format string) error {
return nil
}
// ValidateLabels checks the legality of the labels.
func ValidateLabels(labels []*metapb.StoreLabel) error {
for _, label := range labels {
if err := validateFormat(label.Key, keyFormat); err != nil {
return err
}
if err := validateFormat(label.Value, valueFormat); err != nil {
return err
}
}
return nil
}
// ValidateURLWithScheme checks the format of the URL.
func ValidateURLWithScheme(rawURL string) error {
u, err := url.ParseRequestURI(rawURL)

View File

@ -1,78 +0,0 @@
// Copyright 2018 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package config
import (
"github.com/czs007/suvlim/pkg/metapb"
. "github.com/pingcap/check"
)
var _ = Suite(&testUtilSuite{})
type testUtilSuite struct{}
func (s *testUtilSuite) TestValidateLabels(c *C) {
tests := []struct {
label string
hasErr bool
}{
{"z1", false},
{"z-1", false},
{"h1;", true},
{"z_1", false},
{"z_1&", true},
{"cn", false},
{"Zo^ne", true},
{"z_", true},
{"hos&t-15", true},
{"_test1", true},
{"-test1", true},
{"192.168.199.1", false},
{"www.pingcap.com", false},
{"h_127.0.0.1", false},
{"a", false},
{"a/b", false},
{"ab/", true},
{"/ab", true},
{"$abc", false},
{"$", true},
{"a$b", true},
{"$$", true},
}
for _, t := range tests {
c.Assert(ValidateLabels([]*metapb.StoreLabel{{Key: t.label}}) != nil, Equals, t.hasErr)
}
}
func (s *testUtilSuite) TestValidateURLWithScheme(c *C) {
tests := []struct {
addr string
hasErr bool
}{
{"", true},
{"foo", true},
{"/foo", true},
{"http", true},
{"http://", true},
{"http://foo", false},
{"https://foo", false},
{"http://127.0.0.1", false},
{"http://127.0.0.1/", false},
{"https://foo.com/bar", false},
{"https://foo.com/bar/", false},
}
for _, t := range tests {
c.Assert(ValidateURLWithScheme(t.addr) != nil, Equals, t.hasErr)
}
}

View File

@ -0,0 +1,40 @@
project(CLIENT_CPP LANGUAGES CXX)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
cmake_minimum_required(VERSION 3.16)
set( CMAKE_CXX_STANDARD 17 )
set( CMAKE_CXX_STANDARD_REQUIRED on )
set (CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${CMAKE_CURRENT_SOURCE_DIR}/cmake")
if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release CACHE STRING "Choose the type of build." FORCE)
endif ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lprotobuf -fPIC -pthread -Wall -Wno-unused-variable -Wno-sign-compare -Werror")
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
set(OUTDIR ${CMAKE_CURRENT_SOURCE_DIR}/build)
SET(EXECUTABLE_OUTPUT_PATH ${OUTDIR})
if( CMAKE_BUILD_TYPE STREQUAL "Release" )
message(STATUS "Building Release version")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_ARROW_ARRAY_SIZE=1073741824") #1G
else()
message(STATUS "Building Debug version")
add_definitions(-DDEBUG_RENDER)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0 -g")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_ARROW_ARRAY_SIZE=16777216") #16M
endif()
if ("${MAKE}" STREQUAL "")
if (NOT MSVC)
find_program(MAKE make)
endif ()
endif ()
set(CLIENT_SOURCE_DIR ${PROJECT_SOURCE_DIR})
set(CLIENT_SRC ${PROJECT_SOURCE_DIR}/src)
add_subdirectory(src)
add_subdirectory(unittest)
#add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/unittest)

View File

@ -0,0 +1,14 @@
set(src-cpp
client.cpp
consumer.cpp
producer.cpp
pb/pulsar.pb.cc)
add_library(client_cpp SHARED
${src-cpp}
)
target_link_libraries(client_cpp pulsar protobuf)
install(TARGETS client_cpp
DESTINATION lib)

View File

@ -0,0 +1,11 @@
#include "client.h"
namespace message_client {
MsgClient::MsgClient(const std::string &serviceUrl) : pulsar::Client(serviceUrl) {}
MsgClient::MsgClient(const std::string &serviceUrl, const pulsar::ClientConfiguration& clientConfiguration)
: pulsar::Client(serviceUrl, clientConfiguration) {}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include "pulsar/Client.h"
#include <pulsar/ClientConfiguration.h>
namespace message_client {
using Result = pulsar::Result;
using Message = pulsar::Message;
class MsgClient : public pulsar::Client{
public:
MsgClient(const std::string& serviceUrl);
MsgClient(const std::string& serviceUrl, const pulsar::ClientConfiguration& clientConfiguration);
void set_client_id(int64_t id) { client_id_ = id; }
int64_t get_client_id() { return client_id_; }
private:
int64_t client_id_;
};
}

View File

@ -0,0 +1,66 @@
#include "consumer.h"
#include "pb/pulsar.pb.h"
namespace message_client {
MsgConsumer::MsgConsumer(std::shared_ptr<MsgClient> &client, std::string subscription_name, const ConsumerConfiguration conf)
:client_(client), config_(conf), subscription_name_(subscription_name){}
Result MsgConsumer::subscribe(const std::string &topic) {
return client_->subscribe(topic, subscription_name_, config_, consumer_);
}
Result MsgConsumer::subscribe(const std::vector<std::string> &topics) {
return client_->subscribe(topics, subscription_name_, config_, consumer_);
}
Result MsgConsumer::unsubscribe() {
return consumer_.unsubscribe();
}
Result MsgConsumer::receive(Message &msg) {
return consumer_.receive(msg);
}
std::shared_ptr<void> MsgConsumer::receive_proto(ConsumerType consumer_type) {
Message msg;
receive(msg);
acknowledge(msg);
switch (consumer_type) {
case INSERT: {
pb::InsertMsg insert_msg;
insert_msg.ParseFromString(msg.getDataAsString());
auto message = std::make_shared<pb::InsertMsg>(insert_msg);
return std::shared_ptr<void>(message);
}
case DELETE: {
pb::DeleteMsg delete_msg;
delete_msg.ParseFromString(msg.getDataAsString());
auto message = std::make_shared<pb::DeleteMsg>(delete_msg);
return std::shared_ptr<void>(message);
}
case SEARCH_RESULT: {
pb::SearchResultMsg search_res_msg;
search_res_msg.ParseFromString(msg.getDataAsString());
auto message = std::make_shared<pb::SearchResultMsg>(search_res_msg);
return std::shared_ptr<void>(message);
}
case TEST:
pb::TestData test_msg;
test_msg.ParseFromString(msg.getDataAsString());
auto message = std::make_shared<pb::TestData>(test_msg);
return std::shared_ptr<void>(message);
}
return nullptr;
}
Result MsgConsumer::close() {
return consumer_.close();
}
Result MsgConsumer::acknowledge(const Message &message) {
return consumer_.acknowledge(message);
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include "pulsar/Consumer.h"
#include "client.h"
namespace message_client {
enum ConsumerType {
INSERT = 0,
DELETE = 1,
SEARCH_RESULT = 2,
TEST = 3,
};
using Consumer = pulsar::Consumer;
using ConsumerConfiguration = pulsar::ConsumerConfiguration;
class MsgConsumer{
public:
MsgConsumer(std::shared_ptr<message_client::MsgClient> &client, std::string consumer_name,
const pulsar::ConsumerConfiguration conf = ConsumerConfiguration());
Result subscribe(const std::string& topic);
Result subscribe(const std::vector<std::string>& topics);
Result unsubscribe();
Result receive(Message& msg);
std::shared_ptr<void> receive_proto(ConsumerType consumer_type);
Result acknowledge(const Message& message);
Result close();
const Consumer&
consumer() const {return consumer_; }
private:
Consumer consumer_;
std::shared_ptr<MsgClient> client_;
ConsumerConfiguration config_;
std::string subscription_name_;
};
}

View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
protoc -I=./ --cpp_out=./ pulsar.proto

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,143 @@
syntax = "proto3";
package pb;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
enum OpType {
Insert = 0;
Delete = 1;
Search = 2;
TimeSync = 3;
Key2Seg = 4;
Statistics = 5;
}
message SegmentRecord {
repeated string seg_info = 1;
}
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message AttrRecord {
repeated int32 int32_value = 1;
repeated int64 int64_value = 2;
repeated float float_value = 3;
repeated double double_value = 4;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
message FieldValue {
string field_name = 1;
DataType type = 2;
AttrRecord attr_record = 3;
VectorRecord vector_record = 4;
}
message PulsarMessage {
string collection_name = 1;
repeated FieldValue fields = 2;
int64 entity_id = 3;
string partition_tag = 4;
VectorParam vector_param =5;
SegmentRecord segments = 6;
int64 timestamp = 7;
int64 client_id = 8;
OpType msg_type = 9;
string topic_name = 10;
int64 partition_id = 11;
}
//message PulsarMessages {
// string collection_name = 1;
// repeated FieldValue fields = 2;
// repeated int64 entity_id = 3;
// string partition_tag = 4;
// repeated VectorParam vector_param =5;
// repeated SegmentRecord segments = 6;
// repeated int64 timestamp = 7;
// repeated int64 client_id = 8;
// OpType msg_type = 9;
// repeated string topic_name = 10;
// repeated int64 partition_id = 11;
//}
message TestData {
string id = 1;
string name = 2;
}
message InsertMsg {
int64 client_id = 1;
}
message DeleteMsg {
int64 client_id = 1;
}
message SearchMsg {
int64 client_id = 1;
}
message SearchResultMsg {
int64 client_id = 1;
}

View File

@ -0,0 +1,27 @@
#include "producer.h"
namespace message_client {
MsgProducer::MsgProducer(std::shared_ptr<MsgClient> &client, const std::string &topic, const ProducerConfiguration conf) : client_(client), config_(conf){
createProducer(topic);
}
Result MsgProducer::createProducer(const std::string &topic) {
return client_->createProducer(topic, producer_);
}
Result MsgProducer::send(const Message &msg) {
return producer_.send(msg);
}
Result MsgProducer::send(const std::string &msg) {
auto pulsar_msg = pulsar::MessageBuilder().setContent(msg).build();
return send(pulsar_msg);
}
Result MsgProducer::close() {
return producer_.close();
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include "pulsar/Producer.h"
#include "client.h"
namespace message_client {
using Producer = pulsar::Producer;
using ProducerConfiguration = pulsar::ProducerConfiguration;
class MsgProducer{
public:
MsgProducer(std::shared_ptr<MsgClient> &client, const std::string &topic, const ProducerConfiguration conf = ProducerConfiguration());
Result createProducer(const std::string& topic);
Result send(const Message& msg);
Result send(const std::string& msg);
Result close();
const Producer&
producer() const { return producer_; }
private:
Producer producer_;
std::shared_ptr<MsgClient> client_;
ProducerConfiguration config_;
};
}

View File

@ -0,0 +1,15 @@
include_directories(${CMAKE_SOURCE_DIR}/src)
include_directories(.)
enable_testing()
set(unittest_srcs
unittest_entry.cpp
consumer_test.cpp producer_test.cpp)
add_executable(test ${unittest_srcs})
target_link_libraries(test
client_cpp
pulsar
gtest)
install(TARGETS test DESTINATION unittest)

View File

@ -0,0 +1,14 @@
#include <gtest/gtest.h>
#include "consumer.h"
#include "pb/pulsar.pb.h"
TEST(CLIENT_CPP, CONSUMER) {
auto client= std::make_shared<message_client::MsgClient>("pulsar://localhost:6650");
message_client::MsgConsumer consumer(client, "my_consumer");
consumer.subscribe("test");
auto msg = consumer.receive_proto(message_client::TEST);
pb::TestData* data = (pb::TestData*)(msg.get());
std::cout << "Received: " << msg << " with payload '" << data->name()<< ";" << data->id();
consumer.close();
client->close();
}

View File

@ -0,0 +1,15 @@
#include <gtest/gtest.h>
#include "producer.h"
#include "pb/pulsar.pb.h"
TEST(CLIENT_CPP, Producer) {
auto client= std::make_shared<message_client::MsgClient>("pulsar://localhost:6650");
message_client::MsgProducer producer(client,"test");
pb::TestData data;
data.set_id("test");
data.set_name("hahah");
std::string to_string = data.SerializeAsString();
producer.send(to_string);
producer.close();
client->close();
}

View File

@ -0,0 +1,6 @@
#include <gtest/gtest.h>
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1,30 +1,13 @@
package pulsar
package client_go
import (
"context"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
"log"
"github.com/czs007/suvlim/pulsar/schema"
"sync"
"suvlim/pulsar/client-go/schema"
)
var (
wg sync.WaitGroup
//wgJob sync.WaitGroup
//wgQuery sync.WaitGroup
//wgWrite sync.WaitGroup
OriginMsgSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"CollectionName\",\"type\":\"string\"}," +
"{\"name\":\"Fields\",\"type\":\"[]*FieldValue\"}" +
"{\"name\":\"EntityId\",\"type\":\"int64\"}" +
"{\"name\":\"PartitionTag\",\"type\":\"string\"}" +
"{\"name\":\"VectorParam\",\"type\":\"*VectorParam\"}" +
"{\"name\":\"Segments\",\"type\":\"[]string\"}" +
"{\"name\":\"Timestamp\",\"type\":\"int64\"}" +
"{\"name\":\"ClientId\",\"type\":\"int64\"}" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}" +
"]}"
SyncEofSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
@ -122,17 +105,29 @@ func (mc *MessageClient) CreateClient(url string) pulsar.Client {
return client
}
func (mc *MessageClient) InitClient(url string,topics []string) {
func (mc *MessageClient) InitClient(url string, topics []string, consumerMsgSchema string) {
//create client
mc.client = mc.CreateClient(url)
//create producer
for topicIndex := range topics {
if topics[topicIndex] == "insert" {
mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "insert")
}
if topics[topicIndex] == "delete" {
mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete")
}
if topics[topicIndex] == "key2seg" {
mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "key2seg")
}
}
mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "insert")
mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete")
mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "key2seg")
//create consumer
mc.consumer = mc.CreateConsumer(OriginMsgSchema, topics)
mc.consumer = mc.CreateConsumer(consumerMsgSchema, topics)
// init channel
mc.insertChan = make(chan *schema.InsertMsg, 1000)
@ -148,6 +143,36 @@ const (
OpInWriteNode JobType = 1
)
func (mc *MessageClient) PrepareMsg(opType schema.OpType, msgLen int) {
switch opType {
case schema.Insert:
for i := 0; i < msgLen; i++ {
msg := <- mc.insertChan
mc.InsertMsg[i] = msg
}
case schema.Delete:
for i := 0; i < msgLen; i++ {
msg := <- mc.deleteChan
mc.DeleteMsg[i] = msg
}
case schema.Search:
for i := 0; i < msgLen; i++ {
msg := <-mc.searchChan
mc.SearchMsg[i] = msg
}
case schema.TimeSync:
for i := 0; i < msgLen; i++ {
msg := <- mc.timeSyncChan
mc.timeMsg[i] = msg
}
case schema.Key2Seg:
for i := 0; i < msgLen; i++ {
msg := <-mc.key2SegChan
mc.key2segMsg[i] = msg
}
}
}
func (mc *MessageClient) PrepareBatchMsg(jobType JobType) {
// assume the channel not full
mc.InsertMsg = make([]*schema.InsertMsg, 1000)
@ -156,6 +181,8 @@ func (mc *MessageClient) PrepareBatchMsg(jobType JobType) {
mc.timeMsg = make([]*schema.TimeSyncMsg, 1000)
mc.key2segMsg = make([]*schema.Key2SegMsg, 1000)
// ensure all messages before time in timeSyncTopic have been push into channel
// get the length of every channel
insertLen := len(mc.insertChan)
deleteLen := len(mc.deleteChan)
@ -163,29 +190,12 @@ func (mc *MessageClient) PrepareBatchMsg(jobType JobType) {
timeLen := len(mc.timeSyncChan)
key2segLen := len(mc.key2SegChan)
// get message from channel to slice
for i := 0; i < insertLen; i++ {
msg := <- mc.insertChan
mc.InsertMsg[i] = msg
}
for i := 0; i < deleteLen; i++ {
msg := <- mc.deleteChan
mc.DeleteMsg[i] = msg
}
for i := 0; i < timeLen; i++ {
msg := <- mc.timeSyncChan
mc.timeMsg[i] = msg
}
mc.PrepareMsg(schema.Insert, insertLen)
mc.PrepareMsg(schema.Delete, deleteLen)
mc.PrepareMsg(schema.TimeSync, timeLen)
if jobType == OpInQueryNode {
for i := 0; i < key2segLen; i++ {
msg := <-mc.key2SegChan
mc.key2segMsg[i] = msg
}
for i := 0; i < searchLen; i++ {
msg := <-mc.searchChan
mc.SearchMsg[i] = msg
}
mc.PrepareMsg(schema.Key2Seg, key2segLen)
mc.PrepareMsg(schema.Search, searchLen)
}
}

View File

@ -0,0 +1,744 @@
syntax = "proto3";
package pb;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
/**
* @brief Attribute record
*/
message AttrRecord {
repeated int32 int32_value = 1;
repeated int64 int64_value = 2;
repeated float float_value = 3;
repeated double double_value = 4;
}
/**
* @brief Vector records
*/
message VectorRecord {
repeated VectorRowRecord records = 1;
}
/**
* @brief Field values
*/
message FieldValue {
string field_name = 1;
DataType type = 2;
AttrRecord attr_record = 3;
VectorRecord vector_record = 4;
}
/**
* @brief Parameters for insert action
*/
message InsertParam {
string collection_name = 1;
repeated FieldValue fields = 2;
repeated int64 entity_id_array = 3; //optional
string partition_tag = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Entity ids
*/
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
/**
* @brief Search vector parameters
*/
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
/**
* @brief Parameters for search action
* @dsl example:
* {
* "query": {
* "bool": {
* "must": [
* {
* "must":[
* {
* "should": [
* {
* "term": {
* "gender": ["male"]
* }
* },
* {
* "range": {
* "height": {"gte": "170.0", "lte": "180.0"}
* }
* }
* ]
* },
* {
* "must_not": [
* {
* "term": {
* "age": [20, 21, 22, 23, 24, 25]
* }
* },
* {
* "Range": {
* "weight": {"lte": "100"}
* }
* }
* ]
* }
* ]
* },
* {
* "must": [
* {
* "vector": {
* "face_img": {
* "topk": 10,
* "metric_type": "L2",
* "query": [],
* "params": {
* "nprobe": 10
* }
* }
* }
* }
* ]
* }
* ]
* }
* },
* "fields": ["age", "face_img"]
* }
*/
message SearchParam {
string collection_name = 1;
repeated string partition_tag_array = 2;
repeated VectorParam vector_param = 3;
string dsl = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for searching in segments
*/
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
/**
* @brief Entities
*/
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated FieldValue fields = 4;
}
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}

View File

@ -0,0 +1,71 @@
package client_go
import (
"fmt"
"suvlim/pulsar/client-go/schema"
"sync"
"time"
)
var (
consumerQSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
type QueryNode struct {
mc MessageClient
}
func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) {
wg.Add(3)
go qn.insert_query(qn.mc.InsertMsg, wg)
go qn.delete_query(qn.mc.DeleteMsg, wg)
go qn.search_query(qn.mc.SearchMsg, wg)
wg.Wait()
}
func (qn *QueryNode) PrepareBatchMsg() {
qn.mc.PrepareBatchMsg(JobType(0))
}
func (qn *QueryNode)ReceiveMessage() {
qn.mc.ReceiveMessage()
}
func main() {
mc := MessageClient{}
topics := []string{"insert", "delete"}
mc.InitClient("pulsar://localhost:6650", topics, consumerQSchema)
qn := QueryNode{mc}
wg := sync.WaitGroup{}
go qn.ReceiveMessage()
for {
time.Sleep(200 * time.Millisecond)
qn.PrepareBatchMsg()
qn.doQueryNode(wg)
fmt.Println("do a batch in 200ms")
}
}
func (qn *QueryNode) insert_query(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (qn *QueryNode) delete_query(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (qn *QueryNode) search_query(data []*schema.SearchMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}

View File

@ -109,6 +109,9 @@ type PulsarMessage struct {
Timestamp int64
ClientId int64
MsgType OpType
TopicName string
PartitionId int64
}
type Message interface {
@ -192,3 +195,4 @@ func (kms *Key2SegMsg) GetType() OpType {
type SyncEofMsg struct {
MsgType OpType
}

View File

@ -1,12 +1,18 @@
package pulsar
package client_go
import (
"fmt"
"github.com/czs007/suvlim/pulsar/schema"
"suvlim/pulsar/client-go/schema"
"sync"
"time"
)
var (
consumerWSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
type WriteNode struct {
mc MessageClient
}
@ -25,10 +31,10 @@ func main() {
mc := MessageClient{}
topics := []string{"insert", "delete"}
mc.InitClient("pulsar://localhost:6650", topics)
mc.InitClient("pulsar://localhost:6650", topics, consumerWSchema)
go mc.ReceiveMessage()
wg := sync.WaitGroup{}
wn := WriteNode{mc}
for {

View File

@ -0,0 +1,19 @@
package test
import "sync"
var (
wg sync.WaitGroup
OriginMsgSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"CollectionName\",\"type\":\"string\"}," +
"{\"name\":\"Fields\",\"type\":\"[]*FieldValue\"}" +
"{\"name\":\"EntityId\",\"type\":\"int64\"}" +
"{\"name\":\"PartitionTag\",\"type\":\"string\"}" +
"{\"name\":\"VectorParam\",\"type\":\"*VectorParam\"}" +
"{\"name\":\"Segments\",\"type\":\"[]string\"}" +
"{\"name\":\"Timestamp\",\"type\":\"int64\"}" +
"{\"name\":\"ClientId\",\"type\":\"int64\"}" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}" +
"]}"
)

View File

@ -1,60 +0,0 @@
package pulsar
// import (
// "fmt"
// "suvlim/pulsar/schema"
// "sync"
// "time"
// )
// type QueryNode struct {
// mc MessageClient
// }
// func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) {
// wg.Add(3)
// go qn.insert_query(qn.mc.InsertMsg, wg)
// go qn.delete_query(qn.mc.DeleteMsg, wg)
// go qn.search_query(qn.mc.SearchMsg, wg)
// wg.Wait()
// }
// func (qn *QueryNode) PrepareBatchMsg() {
// qn.mc.PrepareBatchMsg(JobType(0))
// }
// func main() {
// mc := MessageClient{}
// topics := []string{"insert", "delete"}
// mc.InitClient("pulsar://localhost:6650", topics)
// go mc.ReceiveMessage()
// qn := QueryNode{mc}
// for {
// time.Sleep(200 * time.Millisecond)
// qn.PrepareBatchMsg()
// qn.doQueryNode(wg)
// fmt.Println("do a batch in 200ms")
// }
// }
// func (qn *QueryNode) insert_query(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status{
// wg.Done()
// return schema.Status{schema.ErrorCode_SUCCESS, ""}
// }
// func (qn *QueryNode) delete_query(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status{
// wg.Done()
// return schema.Status{schema.ErrorCode_SUCCESS, ""}
// }
// func (qn *QueryNode) search_query(data []*schema.SearchMsg, wg sync.WaitGroup) schema.Status{
// wg.Done()
// return schema.Status{schema.ErrorCode_SUCCESS, ""}
// }

View File

@ -1,13 +1,15 @@
package reader
import "github.com/czs007/suvlim/pulsar/schema"
import (
schema2 "suvlim/pulsar/client-go/schema"
)
type IndexConfig struct {}
func buildIndex(config IndexConfig) schema.Status {
return schema.Status{Error_code: schema.ErrorCode_SUCCESS}
func buildIndex(config IndexConfig) schema2.Status {
return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS}
}
func dropIndex(fieldName string) schema.Status {
return schema.Status{Error_code: schema.ErrorCode_SUCCESS}
func dropIndex(fieldName string) schema2.Status {
return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS}
}

View File

@ -2,7 +2,7 @@ package reader
import (
"fmt"
"github.com/czs007/suvlim/pulsar/schema"
schema2 "suvlim/pulsar/client-go/schema"
)
type ResultEntityIds []int64
@ -12,17 +12,17 @@ func getResultTopicByClientId(clientId int64) string {
return "result-topic/partition-" + string(clientId)
}
func publishResult(ids *ResultEntityIds, clientId int64) schema.Status {
func publishResult(ids *ResultEntityIds, clientId int64) schema2.Status {
// TODO: Pulsar publish
var resultTopic = getResultTopicByClientId(clientId)
fmt.Println(resultTopic)
return schema.Status{Error_code: schema.ErrorCode_SUCCESS}
return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS}
}
func publicStatistic(statisticTopic string) schema.Status {
func publicStatistic(statisticTopic string) schema2.Status {
// TODO: get statistic info
// getStatisticInfo()
// var info = getStatisticInfo()
// TODO: Pulsar publish
return schema.Status{Error_code: schema.ErrorCode_SUCCESS}
return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS}
}