Obtain running rpc requests information (#2240) (#2241)

* add cmd to get requests

Signed-off-by: yhz <413554850@qq.com>

* forward class declaration

Signed-off-by: yhz <413554850@qq.com>

* Obtain running rpc requests information (fix #2240)

Signed-off-by: yhz <413554850@qq.com>

* Fix unittest compile failed issue

Signed-off-by: Yhz <yinghao.zou@zilliz.com>

* Log error with msg

Signed-off-by: Yhz <yinghao.zou@zilliz.com>
This commit is contained in:
BossZou 2020-05-08 19:17:48 +08:00 committed by GitHub
parent 1016dbae1a
commit 1811254de6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 93 additions and 26 deletions

View File

@ -11,6 +11,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1946 Fix load index file CPU2GPU fail during searching
- \#1955 Switch create_index operation to background once client break connection
- \#1997 Index file missed after compact
- \#2002 Remove log error msg `Attributes is null`
- \#2073 Fix CheckDBConfigBackendUrl error message
- \#2076 CheckMetricConfigAddress error message
- \#2120 Fix Search expected failed if search params set invalid
@ -34,6 +35,7 @@ Please mark all change in change log and use the issue from GitHub
- \#2064 Warn when use SQLite as metadata management
- \#2111 Check GPU environment before start server
- \#2206 Log file rotating
- \#2240 Obtain running rpc requests information
## Improvement
- \#221 Refactor LOG macro

View File

@ -121,8 +121,8 @@ DefaultAttrsFormat::write(const milvus::storage::FSHandlerPtr& fs_ptr, const mil
auto it = attrs_ptr->attrs.begin();
if (it == attrs_ptr->attrs.end()) {
std::string err_msg = "Attributes is null";
LOG_ENGINE_ERROR_ << err_msg;
// std::string err_msg = "Attributes is null";
// LOG_ENGINE_ERROR_ << err_msg;
return;
}

View File

@ -54,6 +54,16 @@ Context::IsConnectionBroken() const {
return context_->IsConnectionBroken();
}
BaseRequest::RequestType
Context::GetRequestType() const {
return request_type_;
}
void
Context::SetRequestType(BaseRequest::RequestType type) {
request_type_ = type;
}
/////////////////////////////////////////////////////////////////////////////////////////////////
ContextChild::ContextChild(const ContextPtr& context, const std::string& operation_name) {
if (context) {

View File

@ -18,6 +18,7 @@
#include <grpcpp/server_context.h>
#include "server/context/ConnectionContext.h"
#include "server/delivery/request/BaseRequest.h"
#include "tracing/TraceContext.h"
namespace milvus {
@ -50,8 +51,15 @@ class Context {
bool
IsConnectionBroken() const;
BaseRequest::RequestType
GetRequestType() const;
void
SetRequestType(BaseRequest::RequestType type);
private:
std::string request_id_;
BaseRequest::RequestType request_type_;
std::shared_ptr<tracing::TraceContext> trace_context_;
ConnectionContextPtr context_;
};

View File

@ -10,12 +10,14 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "server/delivery/request/BaseRequest.h"
#include <map>
#include "server/context/Context.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include <map>
namespace milvus {
namespace server {
@ -81,6 +83,9 @@ BaseRequest::BaseRequest(const std::shared_ptr<milvus::server::Context>& context
bool async)
: context_(context), type_(type), async_(async), done_(false) {
request_group_ = milvus::server::RequestGroup(type);
if (nullptr != context_) {
context_->SetRequestType(type_);
}
}
BaseRequest::~BaseRequest() {

View File

@ -17,7 +17,6 @@
#include "grpc/gen-status/status.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "query/GeneralQuery.h"
#include "server/context/Context.h"
#include "utils/Json.h"
#include "utils/Status.h"
@ -103,6 +102,8 @@ struct PartitionParam {
}
};
class Context;
class BaseRequest {
public:
enum RequestType {

View File

@ -73,6 +73,25 @@ ErrorMap(ErrorCode code) {
}
}
std::string
RequestMap(BaseRequest::RequestType request_type) {
static const std::unordered_map<BaseRequest::RequestType, std::string> request_map = {
{BaseRequest::kInsert, "Insert"},
{BaseRequest::kCreateIndex, "CreateIndex"},
{BaseRequest::kSearch, "Search"},
{BaseRequest::kSearchByID, "SearchByID"},
{BaseRequest::kHybridSearch, "HybridSearch"},
{BaseRequest::kFlush, "Flush"},
{BaseRequest::kCompact, "Compact"},
};
if (request_map.find(request_type) != request_map.end()) {
return request_map.at(request_type);
} else {
return "OtherRequest";
}
}
namespace {
void
CopyRowRecords(const google::protobuf::RepeatedPtrField<::milvus::grpc::RowRecord>& grpc_records,
@ -670,8 +689,30 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext* context, const ::milvus::grpc::Co
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__);
std::string reply;
Status status = request_handler_.Cmd(GetContext(context), request->cmd(), reply);
response->set_string_reply(reply);
Status status;
std::string cmd = request->cmd();
std::vector<std::string> requests;
if (cmd == "requests") {
std::lock_guard<std::mutex> lock(context_map_mutex_);
for (auto& iter : context_map_) {
if (nullptr == iter.second) {
continue;
}
if (iter.second->RequestID() == get_request_id(context)) {
continue;
}
auto request_str = RequestMap(iter.second->GetRequestType()) + "-" + iter.second->RequestID();
requests.emplace_back(request_str);
}
nlohmann::json reply_json;
reply_json["requests"] = requests;
reply = reply_json.dump();
response->set_string_reply(reply);
} else {
status = request_handler_.Cmd(GetContext(context), cmd, reply);
response->set_string_reply(reply);
}
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__);
SET_RESPONSE(response->mutable_status(), status, context);

View File

@ -20,6 +20,9 @@ include_directories(${MILVUS_ENGINE_SRC})
include_directories(${MILVUS_THIRDPARTY_SRC})
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-status)
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus)
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/config/handler config_handler_files)
@ -149,6 +152,7 @@ set(common_files
${helper_files}
${server_init_files}
${server_context_files}
${grpc_service_files}
${tracing_files}
${codecs_files}
${codecs_default_files}
@ -157,6 +161,14 @@ set(common_files
${query_files}
)
set(grpc_lib
grpcpp_channelz
grpc++
grpc
grpc_protobuf
grpc_protoc
)
set(unittest_libs
sqlite
libboost_system.a
@ -172,6 +184,7 @@ set(unittest_libs
opentracing
opentracing_mocktracer
fiu
${grpc_lib}
)
if (MILVUS_WITH_AWS)

View File

@ -23,27 +23,16 @@ set(test_files
include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include")
link_directories("${CUDA_TOOLKIT_ROOT_DIR}/lib64")
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-status)
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus)
set(util_files
${MILVUS_ENGINE_SRC}/utils/StringHelpFunctions.cpp
${MILVUS_ENGINE_SRC}/utils/LogUtil.cpp
${MILVUS_ENGINE_SRC}/utils/SignalUtil.cpp)
set(grpc_service_files
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc
)
set(server_test_files
${common_files}
${server_files}
${server_init_files}
${grpc_server_files}
${grpc_service_files}
${server_delivery_files}
${web_server_files}
${util_files}
@ -53,19 +42,13 @@ set(server_test_files
add_executable(test_server ${server_test_files})
set(grpc_lib
grpcpp_channelz
grpc++
grpc
grpc_protobuf
grpc_protoc
)
target_link_libraries(test_server
knowhere
metrics
stdc++
${grpc_lib}
# ${grpc_lib}
${unittest_libs}
oatpp
)

View File

@ -934,6 +934,10 @@ TEST_F(RpcHandlerTest, CMD_TEST) {
handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.string_reply(), MILVUS_VERSION);
command.set_cmd("requests");
handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());
command.set_cmd("tasktable");
handler->Cmd(&context, &command, &reply);
ASSERT_EQ(reply.status().error_code(), ::grpc::Status::OK.error_code());