From 7f14f8a155d7572c44041809e3fc410695d7b52e Mon Sep 17 00:00:00 2001 From: groot Date: Tue, 16 Apr 2019 17:20:09 +0800 Subject: [PATCH] support more protocol Former-commit-id: b69bcf707d83ddcc90f27037b4391790011dcc7d --- cpp/conf/server_config.yaml | 4 +-- cpp/src/server/ServiceWrapper.cpp | 52 ++++++++++++++++++++++--------- cpp/test_client/src/ClientApp.cpp | 6 ++++ 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/cpp/conf/server_config.yaml b/cpp/conf/server_config.yaml index a80a6f5108..aa654e0b23 100644 --- a/cpp/conf/server_config.yaml +++ b/cpp/conf/server_config.yaml @@ -1,8 +1,8 @@ server_config: address: 127.0.0.1 port: 33001 - transfer_protocol: json #optional: binary, compact, json, simple_json, debug - server_mode: simple #optional: simple, non_blocking, hsha, thread_pool, thread_selector + transfer_protocol: json #optional: binary, compact, json, debug + server_mode: thread_pool #optional: simple, thread_pool log_config: global: diff --git a/cpp/src/server/ServiceWrapper.cpp b/cpp/src/server/ServiceWrapper.cpp index d28e09b1d0..33528f6136 100644 --- a/cpp/src/server/ServiceWrapper.cpp +++ b/cpp/src/server/ServiceWrapper.cpp @@ -14,7 +14,10 @@ #include #include +#include +#include #include +//#include #include #include #include @@ -40,7 +43,7 @@ public: void dummy() { // Your implementation goes here - printf("dummy() called\n"); + SERVER_LOG_INFO << "dummy() called"; } /** @@ -50,17 +53,17 @@ public: */ void add_group(const VecGroup& group) { // Your implementation goes here - printf("add_group\n"); + SERVER_LOG_INFO << "add_group() called"; } void get_group(VecGroup& _return, const std::string& group_id) { // Your implementation goes here - printf("get_group\n"); + SERVER_LOG_INFO << "get_group() called"; } void del_group(const std::string& group_id) { // Your implementation goes here - printf("del_group\n"); + SERVER_LOG_INFO << "del_group() called"; } /** @@ -72,12 +75,12 @@ public: */ int64_t add_vector(const std::string& group_id, const VecTensor& tensor) { // Your implementation goes here - printf("add_vector\n"); + SERVER_LOG_INFO << "add_vector() called"; } void add_vector_batch(VecTensorIdList& _return, const std::string& group_id, const VecTensorList& tensor_list) { // Your implementation goes here - printf("add_vector_batch\n"); + SERVER_LOG_INFO << "add_vector_batch() called"; } /** @@ -91,12 +94,12 @@ public: */ void search_vector(VecSearchResult& _return, const std::string& group_id, const int64_t top_k, const VecTensor& tensor, const VecTimeRangeList& time_range_list) { // Your implementation goes here - printf("search_vector\n"); + SERVER_LOG_INFO << "search_vector() called"; } void search_vector_batch(VecSearchResultList& _return, const std::string& group_id, const int64_t top_k, const VecTensorList& tensor_list, const VecTimeRangeList& time_range_list) { // Your implementation goes here - printf("search_vector_batch\n"); + SERVER_LOG_INFO << "search_vector_batch() called"; } }; @@ -118,29 +121,48 @@ void ServiceWrapper::StartService() { ::apache::thrift::stdcxx::shared_ptr handler(new VecServiceHandler()); ::apache::thrift::stdcxx::shared_ptr processor(new VecServiceProcessor(handler)); - ::apache::thrift::stdcxx::shared_ptr serverTransport(new TServerSocket(address, port)); - ::apache::thrift::stdcxx::shared_ptr transportFactory(new TBufferedTransportFactory()); + ::apache::thrift::stdcxx::shared_ptr server_transport(new TServerSocket(address, port)); + ::apache::thrift::stdcxx::shared_ptr transport_factory(new TBufferedTransportFactory()); - ::apache::thrift::stdcxx::shared_ptr protocolFactory; + ::apache::thrift::stdcxx::shared_ptr protocol_factory; if(protocol == "binary") { - protocolFactory.reset(new TBinaryProtocolFactory()); + protocol_factory.reset(new TBinaryProtocolFactory()); } else if(protocol == "json") { - protocolFactory.reset(new TJSONProtocolFactory()); + protocol_factory.reset(new TJSONProtocolFactory()); + } else if(protocol == "compact") { + protocol_factory.reset(new TCompactProtocolFactory()); + } else if(protocol == "debug") { + protocol_factory.reset(new TDebugProtocolFactory()); } else { SERVER_LOG_INFO << "Service protocol: " << protocol << " is not supported currently"; return; } if(mode == "simple") { - s_server.reset(new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory)); + s_server.reset(new TSimpleServer(processor, server_transport, transport_factory, protocol_factory)); s_server->serve(); +// } else if(mode == "non_blocking") { +// ::apache::thrift::stdcxx::shared_ptr nb_server_transport(new TServerSocket(address, port)); +// ::apache::thrift::stdcxx::shared_ptr threadManager(ThreadManager::newSimpleThreadManager()); +// ::apache::thrift::stdcxx::shared_ptr threadFactory(new PosixThreadFactory()); +// threadManager->threadFactory(threadFactory); +// threadManager->start(); +// +// s_server.reset(new TNonblockingServer(processor, +// protocol_factory, +// nb_server_transport, +// threadManager)); } else if(mode == "thread_pool") { ::apache::thrift::stdcxx::shared_ptr threadManager(ThreadManager::newSimpleThreadManager()); ::apache::thrift::stdcxx::shared_ptr threadFactory(new PosixThreadFactory()); threadManager->threadFactory(threadFactory); threadManager->start(); - s_server.reset(new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory, threadManager)); + s_server.reset(new TThreadPoolServer(processor, + server_transport, + transport_factory, + protocol_factory, + threadManager)); s_server->serve(); } else { SERVER_LOG_INFO << "Service mode: " << mode << " is not supported currently"; diff --git a/cpp/test_client/src/ClientApp.cpp b/cpp/test_client/src/ClientApp.cpp index 72456c34df..82a06de199 100644 --- a/cpp/test_client/src/ClientApp.cpp +++ b/cpp/test_client/src/ClientApp.cpp @@ -16,6 +16,8 @@ #include #include +#include +#include #include #include #include @@ -52,6 +54,10 @@ void ClientApp::Run(const std::string &config_file) { protocol_ptr.reset(new TBinaryProtocol(transport_ptr)); } else if(protocol == "json") { protocol_ptr.reset(new TJSONProtocol(transport_ptr)); + } else if(protocol == "compact") { + protocol_ptr.reset(new TCompactProtocol(transport_ptr)); + } else if(protocol == "debug") { + protocol_ptr.reset(new TDebugProtocol(transport_ptr)); } else { CLIENT_LOG_ERROR << "Service protocol: " << protocol << " is not supported currently"; return;