Add Opentracing (#729)

* add TracerUtil

* Interceptor ok

* add handler

* add context

* minor update

* keep span in trace context

* add span in search okay

* Update Context.cpp

* refactor

* refactor

* refactor

* format

* add context in SearchJob

* trace search okay

* add back finish span in interceptor

* add namespace

* add tracing config in server config

* add random id

* debug mode okay

* update CMakeLists

* add opentracing to cmake

* update unittest

* add tracing namespace

* remove std::run_time error

* add lock when erasing context_map

* update tracing config

* lint

* update CHANGELOG

* small fix

* fix server unit test

* fix scheduler unit test

* fix db unit test

* lint

* fix db unit test gpu version

* rename to tracing_config

* fix

* update

* trigger ci
This commit is contained in:
Zhiru Zhu 2019-12-10 15:24:54 +08:00 committed by Jin Hai
parent 79181aa1fb
commit e99896ac62
86 changed files with 1513 additions and 389 deletions

View File

@ -1,6 +1,18 @@
# Changelog
Please mark all change in change log and use the ticket from JIRA.
Please mark all change in change log and use the issue from GitHub
# Milvus 0.7.0 (TBD)
## Bug
## Feature
- \#343 - Add Opentracing
## Improvement
## Task
# Milvus 0.6.0 (2019-12-07)
## Bug

View File

@ -85,6 +85,8 @@ define_option(MILVUS_WITH_GRPC "Build with GRPC" ON)
define_option(MILVUS_WITH_ZLIB "Build with zlib compression" ON)
define_option(MILVUS_WITH_OPENTRACING "Build with Opentracing" ON)
#----------------------------------------------------------------------
set_option_category("Test and benchmark")

View File

@ -25,7 +25,8 @@ set(MILVUS_THIRDPARTY_DEPENDENCIES
libunwind
gperftools
GRPC
ZLIB)
ZLIB
Opentracing)
message(STATUS "Using ${MILVUS_DEPENDENCY_SOURCE} approach to find dependencies")
@ -57,6 +58,8 @@ macro(build_dependency DEPENDENCY_NAME)
build_grpc()
elseif ("${DEPENDENCY_NAME}" STREQUAL "ZLIB")
build_zlib()
elseif ("${DEPENDENCY_NAME}" STREQUAL "Opentracing")
build_opentracing()
else ()
message(FATAL_ERROR "Unknown thirdparty dependency to build: ${DEPENDENCY_NAME}")
endif ()
@ -165,9 +168,9 @@ endif ()
macro(resolve_dependency DEPENDENCY_NAME)
if (${DEPENDENCY_NAME}_SOURCE STREQUAL "AUTO")
find_package(${DEPENDENCY_NAME} MODULE)
if(NOT ${${DEPENDENCY_NAME}_FOUND})
build_dependency(${DEPENDENCY_NAME})
endif()
if (NOT ${${DEPENDENCY_NAME}_FOUND})
build_dependency(${DEPENDENCY_NAME})
endif ()
elseif (${DEPENDENCY_NAME}_SOURCE STREQUAL "BUNDLED")
build_dependency(${DEPENDENCY_NAME})
elseif (${DEPENDENCY_NAME}_SOURCE STREQUAL "SYSTEM")
@ -339,6 +342,13 @@ else ()
endif ()
set(ZLIB_MD5 "0095d2d2d1f3442ce1318336637b695f")
if (DEFINED ENV{MILVUS_OPENTRACING_URL})
set(OPENTRACING_SOURCE_URL "$ENV{MILVUS_OPENTRACING_URL}")
else ()
set(OPENTRACING_SOURCE_URL "https://github.com/opentracing/opentracing-cpp/archive/${OPENTRACING_VERSION}.tar.gz")
endif ()
# ----------------------------------------------------------------------
# Google gtest
@ -1208,3 +1218,53 @@ if (MILVUS_WITH_ZLIB)
get_target_property(ZLIB_INCLUDE_DIR zlib INTERFACE_INCLUDE_DIRECTORIES)
include_directories(SYSTEM ${ZLIB_INCLUDE_DIR})
endif ()
# ----------------------------------------------------------------------
# opentracing
macro(build_opentracing)
message(STATUS "Building OPENTRACING-${OPENTRACING_VERSION} from source")
set(OPENTRACING_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/opentracing_ep-prefix/src/opentracing_ep")
set(OPENTRACING_STATIC_LIB "${OPENTRACING_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentracing${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(OPENTRACING_MOCK_TRACER_STATIC_LIB "${OPENTRACING_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentracing_mocktracer${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(OPENTRACING_INCLUDE_DIR "${OPENTRACING_PREFIX}/include")
set(OPENTRACING_CMAKE_ARGS
${EP_COMMON_CMAKE_ARGS}
"-DCMAKE_INSTALL_PREFIX=${OPENTRACING_PREFIX}"
-DBUILD_SHARED_LIBS=OFF)
externalproject_add(opentracing_ep
URL
${OPENTRACING_SOURCE_URL}
${EP_LOG_OPTIONS}
CMAKE_ARGS
${OPENTRACING_CMAKE_ARGS}
BUILD_COMMAND
${MAKE}
${MAKE_BUILD_ARGS}
BUILD_BYPRODUCTS
${OPENTRACING_STATIC_LIB}
${OPENTRACING_MOCK_TRACER_STATIC_LIB}
)
file(MAKE_DIRECTORY "${OPENTRACING_INCLUDE_DIR}")
add_library(opentracing STATIC IMPORTED)
set_target_properties(opentracing
PROPERTIES IMPORTED_LOCATION "${OPENTRACING_STATIC_LIB}"
INTERFACE_INCLUDE_DIRECTORIES "${OPENTRACING_INCLUDE_DIR}")
add_library(opentracing_mocktracer STATIC IMPORTED)
set_target_properties(opentracing_mocktracer
PROPERTIES IMPORTED_LOCATION "${OPENTRACING_MOCK_TRACER_STATIC_LIB}"
INTERFACE_INCLUDE_DIRECTORIES "${OPENTRACING_INCLUDE_DIR}")
add_dependencies(opentracing opentracing_ep)
add_dependencies(opentracing_mocktracer opentracing_ep)
endmacro()
if (MILVUS_WITH_OPENTRACING)
resolve_dependency(Opentracing)
get_target_property(OPENTRACING_INCLUDE_DIR opentracing INTERFACE_INCLUDE_DIRECTORIES)
include_directories(SYSTEM ${OPENTRACING_INCLUDE_DIR})
endif ()

View File

@ -44,3 +44,6 @@ gpu_resource_config:
- gpu0
build_index_resources: # define the GPU devices used for index building, must be in format gpux
- gpu0
tracing_config:
json_config_path: # Absolute path to tracing config file. Creates a no-op tracer if empty

View File

@ -44,3 +44,6 @@ gpu_resource_config:
- gpu0
build_index_resources: # define the GPU devices used for index building, must be in format gpux
- gpu0
tracing_config:
json_config_path: # Absolute path to tracing config file. Creates a no-op tracer if empty

View File

@ -0,0 +1,26 @@
{
"host": "127.0.0.1",
"port": "5666",
"tracer_library": "/path/to/shared_tracing_library",
"tracer_configuration": {
"service_name": "milvus_server",
"sampler": {
"type": "const",
"param": "1"
},
"disabled": false,
"reporter": {
"localAgentHostPort": "127.0.0.1:6831"
},
"headers": {
"jaegerDebugHeader": "jaeger_debug_header",
"jaegerBaggageHeader": "jarger_baggage_header",
"TraceContextHeaderName": "trace_context_header_name",
"traceBaggageHeaderPrefix": "trace_baggage_header_prefix"
},
"baggage_restrictions": {
"denyBaggageOnInitializationFailure": false,
"hostPort": ""
}
}
}

View File

@ -77,14 +77,19 @@ set(thirdparty_files
aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl/request grpc_request_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_impl_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl/interceptor grpc_interceptor_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/context server_context_files)
set(grpc_server_files
${grpc_request_files}
${grpc_impl_files}
${grpc_interceptor_files}
)
aux_source_directory(${MILVUS_ENGINE_SRC}/utils utils_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/tracing tracing_files)
set(engine_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
${cache_files}
@ -103,7 +108,7 @@ if (MILVUS_WITH_PROMETHEUS)
${metrics_prometheus_files})
endif ()
set(client_grpc_lib
set(grpc_lib
grpcpp_channelz
grpc++
grpc
@ -125,7 +130,7 @@ set(boost_lib
set(third_party_libs
sqlite
${client_grpc_lib}
${grpc_lib}
yaml-cpp
mysqlpp
zlib
@ -201,9 +206,20 @@ endif ()
target_link_libraries(metrics ${metrics_lib})
add_library(tracing STATIC ${tracing_files} ${thirdparty_files})
set(tracing_lib
opentracing
opentracing_mocktracer
${grpc_lib}
pthread
z
)
target_link_libraries(tracing ${tracing_lib})
set(server_libs
milvus_engine
metrics
tracing
)
add_executable(milvus_server
@ -213,7 +229,9 @@ add_executable(milvus_server
${server_files}
${grpc_server_files}
${grpc_service_files}
${server_context_files}
${utils_files}
${tracing_files}
)
target_link_libraries(milvus_server

View File

@ -17,15 +17,16 @@
#pragma once
#include "Options.h"
#include "Types.h"
#include "meta/Meta.h"
#include "utils/Status.h"
#include <memory>
#include <string>
#include <vector>
#include "Options.h"
#include "Types.h"
#include "meta/Meta.h"
#include "server/context/Context.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
@ -87,17 +88,20 @@ class DB {
IDNumbers& vector_ids_) = 0;
virtual Status
Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) = 0;
Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) = 0;
virtual Status
Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
ResultDistances& result_distances) = 0;
virtual Status
QueryByFileID(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
QueryByFileID(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
ResultDistances& result_distances) = 0;
virtual Status

View File

@ -16,6 +16,18 @@
// under the License.
#include "db/DBImpl.h"
#include <assert.h>
#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
#include <iostream>
#include <set>
#include <thread>
#include <utility>
#include "Utils.h"
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
@ -33,16 +45,6 @@
#include "utils/StringHelpFunctions.h"
#include "utils/TimeRecorder.h"
#include <assert.h>
#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
#include <iostream>
#include <set>
#include <thread>
#include <utility>
namespace milvus {
namespace engine {
@ -394,21 +396,26 @@ DBImpl::DropIndex(const std::string& table_id) {
}
Status
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
if (shutting_down_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
meta::DatesT dates = {utils::GetDate()};
Status result = Query(table_id, partition_tags, k, nq, nprobe, vectors, dates, result_ids, result_distances);
Status result =
Query(context, table_id, partition_tags, k, nq, nprobe, vectors, dates, result_ids, result_distances);
return result;
}
Status
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
ResultDistances& result_distances) {
auto query_ctx = context->Child("Query");
if (shutting_down_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
@ -443,15 +450,21 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& parti
}
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
status = QueryAsync(query_ctx, table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
query_ctx->GetTraceContext()->GetSpan()->Finish();
return status;
}
Status
DBImpl::QueryByFileID(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
ResultDistances& result_distances) {
auto query_ctx = context->Child("Query by file id");
if (shutting_down_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
@ -478,8 +491,11 @@ DBImpl::QueryByFileID(const std::string& table_id, const std::vector<std::string
}
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = QueryAsync(table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
status = QueryAsync(query_ctx, table_id, files_array, k, nq, nprobe, vectors, result_ids, result_distances);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
query_ctx->GetTraceContext()->GetSpan()->Finish();
return status;
}
@ -496,8 +512,11 @@ DBImpl::Size(uint64_t& result) {
// internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
ResultIds& result_ids, ResultDistances& result_distances) {
auto query_async_ctx = context->Child("Query Async");
server::CollectQueryMetrics metrics(nq);
TimeRecorder rc("");
@ -506,7 +525,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
auto status = ongoing_files_checker_.MarkOngoingFiles(files);
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(query_async_ctx, k, nq, nprobe, vectors);
for (auto& file : files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddIndexFile(file_ptr);
@ -526,6 +545,8 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
result_distances = job->GetResultDistances();
rc.ElapseFromBegin("Engine query totally cost");
query_async_ctx->GetTraceContext()->GetSpan()->Finish();
return Status::OK();
}

View File

@ -17,13 +17,6 @@
#pragma once
#include "DB.h"
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "utils/ThreadPool.h"
#include <atomic>
#include <condition_variable>
#include <list>
@ -35,6 +28,13 @@
#include <thread>
#include <vector>
#include "DB.h"
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "utils/ThreadPool.h"
namespace milvus {
namespace engine {
@ -105,17 +105,20 @@ class DBImpl : public DB {
DropIndex(const std::string& table_id) override;
Status
Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) override;
Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) override;
Status
Query(const std::string& table_id, const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
ResultDistances& result_distances) override;
Status
QueryByFileID(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
QueryByFileID(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq, uint64_t nprobe,
const float* vectors, const meta::DatesT& dates, ResultIds& result_ids,
ResultDistances& result_distances) override;
Status
@ -123,8 +126,9 @@ class DBImpl : public DB {
private:
Status
QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances);
QueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
ResultIds& result_ids, ResultDistances& result_distances);
void
BackgroundTimerTask();

View File

@ -46,7 +46,7 @@ std::vector<TaskPtr>
TaskCreator::Create(const SearchJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& index_file : job->index_files()) {
auto task = std::make_shared<XSearchTask>(index_file.second, nullptr);
auto task = std::make_shared<XSearchTask>(job->GetContext(), index_file.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}

View File

@ -29,6 +29,8 @@
#include "scheduler/interface/interfaces.h"
#include "server/context/Context.h"
namespace milvus {
namespace scheduler {

View File

@ -16,13 +16,15 @@
// under the License.
#include "scheduler/job/SearchJob.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
SearchJob::SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors)
: Job(JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) {
SearchJob::SearchJob(const std::shared_ptr<server::Context>& context, uint64_t topk, uint64_t nq, uint64_t nprobe,
const float* vectors)
: Job(JobType::SEARCH), context_(context), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) {
}
bool
@ -83,5 +85,10 @@ SearchJob::Dump() const {
return ret;
}
const std::shared_ptr<server::Context>&
SearchJob::GetContext() const {
return context_;
}
} // namespace scheduler
} // namespace milvus

View File

@ -32,6 +32,8 @@
#include "db/Types.h"
#include "db/meta/MetaTypes.h"
#include "server/context/Context.h"
namespace milvus {
namespace scheduler {
@ -44,7 +46,8 @@ using ResultDistances = engine::ResultDistances;
class SearchJob : public Job {
public:
SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors);
SearchJob(const std::shared_ptr<server::Context>& context, uint64_t topk, uint64_t nq, uint64_t nprobe,
const float* vectors);
public:
bool
@ -69,6 +72,9 @@ class SearchJob : public Job {
Dump() const override;
public:
const std::shared_ptr<server::Context>&
GetContext() const;
uint64_t
topk() const {
return topk_;
@ -100,6 +106,8 @@ class SearchJob : public Job {
}
private:
const std::shared_ptr<server::Context> context_;
uint64_t topk_ = 0;
uint64_t nq_ = 0;
uint64_t nprobe_ = 0;

View File

@ -16,18 +16,21 @@
// under the License.
#include "scheduler/task/SearchTask.h"
#include <src/scheduler/SchedInst.h>
#include <algorithm>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/job/SearchJob.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <src/scheduler/SchedInst.h>
#include <algorithm>
#include <string>
#include <thread>
#include <utility>
namespace milvus {
namespace scheduler {
@ -97,8 +100,8 @@ CollectFileMetrics(int file_type, size_t file_size) {
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file, TaskLabelPtr label)
: Task(TaskType::SearchTask, std::move(label)), file_(file) {
XSearchTask::XSearchTask(const std::shared_ptr<server::Context>& context, TableFileSchemaPtr file, TaskLabelPtr label)
: Task(TaskType::SearchTask, std::move(label)), context_(context), file_(file) {
if (file_) {
if (file_->metric_type_ != static_cast<int>(MetricType::L2)) {
metric_l2 = false;
@ -110,6 +113,8 @@ XSearchTask::XSearchTask(TableFileSchemaPtr file, TaskLabelPtr label)
void
XSearchTask::Load(LoadType type, uint8_t device_id) {
auto load_ctx = context_->Follower("XSearchTask::Load " + std::to_string(file_->id_));
TimeRecorder rc("");
Status stat = Status::OK();
std::string error_msg;
@ -174,10 +179,14 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
index_id_ = file_->id_;
index_type_ = file_->file_type_;
// search_contexts_.swap(search_contexts_);
load_ctx->GetTraceContext()->GetSpan()->Finish();
}
void
XSearchTask::Execute() {
auto execute_ctx = context_->Follower("XSearchTask::Execute " + std::to_string(index_id_));
if (index_engine_ == nullptr) {
return;
}
@ -246,6 +255,8 @@ XSearchTask::Execute() {
// release index in resource
index_engine_ = nullptr;
execute_ctx->GetTraceContext()->GetSpan()->Finish();
}
void

View File

@ -17,19 +17,20 @@
#pragma once
#include <memory>
#include <vector>
#include "Task.h"
#include "scheduler/Definition.h"
#include "scheduler/job/SearchJob.h"
#include <vector>
namespace milvus {
namespace scheduler {
// TODO(wxyu): rewrite
class XSearchTask : public Task {
public:
explicit XSearchTask(TableFileSchemaPtr file, TaskLabelPtr label);
explicit XSearchTask(const std::shared_ptr<server::Context>& context, TableFileSchemaPtr file, TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;
@ -49,6 +50,8 @@ class XSearchTask : public Task {
// src_input_k, uint64_t nq, uint64_t topk, bool ascending);
public:
const std::shared_ptr<server::Context> context_;
TableFileSchemaPtr file_;
size_t index_id_ = 0;

View File

@ -16,14 +16,16 @@
// under the License.
#include "scheduler/task/TestTask.h"
#include "cache/GpuCacheMgr.h"
#include <utility>
#include "cache/GpuCacheMgr.h"
namespace milvus {
namespace scheduler {
TestTask::TestTask(TableFileSchemaPtr& file, TaskLabelPtr label) : XSearchTask(file, std::move(label)) {
TestTask::TestTask(const std::shared_ptr<server::Context>& context, TableFileSchemaPtr& file, TaskLabelPtr label)
: XSearchTask(context, file, std::move(label)) {
}
void

View File

@ -17,6 +17,8 @@
#pragma once
#include <memory>
#include "SearchTask.h"
namespace milvus {
@ -24,7 +26,7 @@ namespace scheduler {
class TestTask : public XSearchTask {
public:
explicit TestTask(TableFileSchemaPtr& file, TaskLabelPtr label);
explicit TestTask(const std::shared_ptr<server::Context>& context, TableFileSchemaPtr& file, TaskLabelPtr label);
public:
void

View File

@ -29,7 +29,7 @@ add_library(milvus_sdk SHARED
)
target_link_libraries(milvus_sdk
${client_grpc_lib}
${grpc_lib}
zlib
)

View File

@ -17,6 +17,7 @@
#include <sys/stat.h>
#include <algorithm>
#include <fstream>
#include <iostream>
#include <regex>
#include <string>
@ -233,6 +234,13 @@ Config::ValidateConfig() {
}
#endif
/* tracing config */
std::string tracing_config_path;
s = GetTracingConfigJsonConfigPath(tracing_config_path);
if (!s.ok()) {
return s;
}
return Status::OK();
}
@ -1132,6 +1140,21 @@ Config::GetGpuResourceConfigBuildIndexResources(std::vector<int64_t>& value) {
}
#endif
/* tracing config */
Status
Config::GetTracingConfigJsonConfigPath(std::string& value) {
value = GetConfigStr(CONFIG_TRACING, CONFIG_TRACING_JSON_CONFIG_PATH, "");
if (!value.empty()) {
std::ifstream tracer_config(value);
Status s = tracer_config.good() ? Status::OK()
: Status(SERVER_INVALID_ARGUMENT, "Failed to open tracer config file " + value +
": " + std::strerror(errno));
tracer_config.close();
return s;
}
return Status::OK();
}
///////////////////////////////////////////////////////////////////////////////
/* server config */
Status

View File

@ -103,6 +103,11 @@ static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES_DEFAULT = "gpu0";
static const char* CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES = "build_index_resources";
static const char* CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES_DEFAULT = "gpu0";
// TODO:
/* tracing config */
static const char* CONFIG_TRACING = "tracing_config";
static const char* CONFIG_TRACING_JSON_CONFIG_PATH = "json_config_path";
class Config {
public:
static Config&
@ -269,6 +274,10 @@ class Config {
GetGpuResourceConfigBuildIndexResources(std::vector<int64_t>& value);
#endif
/* tracing config */
Status
GetTracingConfigJsonConfigPath(std::string& value);
public:
/* server config */
Status

View File

@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
#include "server/Server.h"
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
@ -23,9 +25,9 @@
#include "scheduler/SchedInst.h"
#include "server/Config.h"
#include "server/DBWrapper.h"
#include "server/Server.h"
#include "server/grpc_impl/GrpcServer.h"
#include "src/version.h"
#include "tracing/TracerUtil.h"
#include "utils/Log.h"
#include "utils/LogUtil.h"
#include "utils/SignalUtil.h"
@ -152,8 +154,15 @@ Server::Start() {
return s;
}
/* log path is defined in Config file, so InitLog must be called after LoadConfig */
Config& config = Config::GetInstance();
/* Init opentracing tracer from config */
std::string tracing_config_path;
s = config.GetTracingConfigJsonConfigPath(tracing_config_path);
tracing_config_path.empty() ? tracing::TracerUtil::InitGlobal()
: tracing::TracerUtil::InitGlobal(tracing_config_path);
/* log path is defined in Config file, so InitLog must be called after LoadConfig */
std::string time_zone;
s = config.GetServerConfigTimeZone(time_zone);
if (!s.ok()) {

View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/context/Context.h"
namespace milvus {
namespace server {
Context::Context(const std::string& request_id) : request_id_(request_id) {
}
const std::shared_ptr<tracing::TraceContext>&
Context::GetTraceContext() const {
return trace_context_;
}
void
Context::SetTraceContext(const std::shared_ptr<tracing::TraceContext>& trace_context) {
trace_context_ = trace_context;
}
std::shared_ptr<Context>
Context::Child(const std::string& operation_name) const {
auto new_context = std::make_shared<Context>(request_id_);
new_context->SetTraceContext(trace_context_->Child(operation_name));
return new_context;
}
std::shared_ptr<Context>
Context::Follower(const std::string& operation_name) const {
auto new_context = std::make_shared<Context>(request_id_);
new_context->SetTraceContext(trace_context_->Follower(operation_name));
return new_context;
}
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include "tracing/TraceContext.h"
namespace milvus {
namespace server {
class Context {
public:
explicit Context(const std::string& request_id);
std::shared_ptr<Context>
Child(const std::string& operation_name) const;
std::shared_ptr<Context>
Follower(const std::string& operation_name) const;
void
SetTraceContext(const std::shared_ptr<tracing::TraceContext>& trace_context);
const std::shared_ptr<tracing::TraceContext>&
GetTraceContext() const;
private:
std::string request_id_;
std::shared_ptr<tracing::TraceContext> trace_context_;
};
} // namespace server
} // namespace milvus

View File

@ -16,6 +16,13 @@
// under the License.
#include "server/grpc_impl/GrpcRequestHandler.h"
#include <src/server/Config.h>
#include <memory>
#include <unordered_map>
#include <vector>
#include "server/grpc_impl/GrpcRequestScheduler.h"
#include "server/grpc_impl/request/CmdRequest.h"
#include "server/grpc_impl/request/CountTableRequest.h"
@ -34,19 +41,98 @@
#include "server/grpc_impl/request/SearchRequest.h"
#include "server/grpc_impl/request/ShowPartitionsRequest.h"
#include "server/grpc_impl/request/ShowTablesRequest.h"
#include "tracing/TextMapCarrier.h"
#include "tracing/TracerUtil.h"
#include "utils/TimeRecorder.h"
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
GrpcRequestHandler::GrpcRequestHandler(const std::shared_ptr<opentracing::Tracer>& tracer)
: tracer_(tracer), random_num_generator_() {
std::random_device random_device;
random_num_generator_.seed(random_device());
}
void
GrpcRequestHandler::OnPostRecvInitialMetaData(
::grpc::experimental::ServerRpcInfo* server_rpc_info,
::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods) {
std::unordered_map<std::string, std::string> text_map;
auto* metadata_map = interceptor_batch_methods->GetRecvInitialMetadata();
auto context_kv = metadata_map->find(tracing::TracerUtil::GetTraceContextHeaderName());
if (context_kv != metadata_map->end()) {
text_map[std::string(context_kv->first.data(), context_kv->first.length())] =
std::string(context_kv->second.data(), context_kv->second.length());
}
// test debug mode
// if (std::string(server_rpc_info->method()).find("Search") != std::string::npos) {
// text_map["demo-debug-id"] = "debug-id";
// }
tracing::TextMapCarrier carrier{text_map};
auto span_context_maybe = tracer_->Extract(carrier);
if (!span_context_maybe) {
std::cerr << span_context_maybe.error().message() << std::endl;
return;
}
auto span = tracer_->StartSpan(server_rpc_info->method(), {opentracing::ChildOf(span_context_maybe->get())});
auto server_context = server_rpc_info->server_context();
auto client_metadata = server_context->client_metadata();
// TODO: request id
std::string request_id;
auto request_id_kv = client_metadata.find("request_id");
if (request_id_kv != client_metadata.end()) {
request_id = request_id_kv->second.data();
} else {
request_id = std::to_string(random_id()) + std::to_string(random_id());
}
auto trace_context = std::make_shared<tracing::TraceContext>(span);
auto context = std::make_shared<Context>(request_id);
context->SetTraceContext(trace_context);
context_map_[server_rpc_info->server_context()] = context;
}
void
GrpcRequestHandler::OnPreSendMessage(::grpc::experimental::ServerRpcInfo* server_rpc_info,
::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods) {
context_map_[server_rpc_info->server_context()]->GetTraceContext()->GetSpan()->Finish();
auto search = context_map_.find(server_rpc_info->server_context());
if (search != context_map_.end()) {
std::lock_guard<std::mutex> lock(context_map_mutex_);
context_map_.erase(search);
}
}
const std::shared_ptr<Context>&
GrpcRequestHandler::GetContext(::grpc::ServerContext* server_context) {
return context_map_[server_context];
}
void
GrpcRequestHandler::SetContext(::grpc::ServerContext* server_context, const std::shared_ptr<Context>& context) {
context_map_[server_context] = context;
}
uint64_t
GrpcRequestHandler::random_id() const {
std::lock_guard<std::mutex> lock(random_mutex_);
auto value = random_num_generator_();
while (value == 0) {
value = random_num_generator_();
}
return value;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
::grpc::Status
GrpcRequestHandler::CreateTable(::grpc::ServerContext* context, const ::milvus::grpc::TableSchema* request,
::milvus::grpc::Status* response) {
BaseRequestPtr request_ptr = CreateTableRequest::Create(request);
BaseRequestPtr request_ptr = CreateTableRequest::Create(context_map_[context], request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
SET_TRACING_TAG(*response, context);
return ::grpc::Status::OK;
}
@ -54,39 +140,39 @@ GrpcRequestHandler::CreateTable(::grpc::ServerContext* context, const ::milvus::
GrpcRequestHandler::HasTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::BoolReply* response) {
bool has_table = false;
BaseRequestPtr request_ptr = HasTableRequest::Create(request->table_name(), has_table);
BaseRequestPtr request_ptr = HasTableRequest::Create(context_map_[context], request->table_name(), has_table);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_bool_reply(has_table);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DropTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseRequestPtr request_ptr = DropTableRequest::Create(request->table_name());
BaseRequestPtr request_ptr = DropTableRequest::Create(context_map_[context], request->table_name());
GrpcRequestScheduler::ExecRequest(request_ptr, response);
SET_TRACING_TAG(*response, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::CreateIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request,
::milvus::grpc::Status* response) {
BaseRequestPtr request_ptr = CreateIndexRequest::Create(request);
BaseRequestPtr request_ptr = CreateIndexRequest::Create(context_map_[context], request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
SET_TRACING_TAG(*response, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc::InsertParam* request,
::milvus::grpc::VectorIds* response) {
BaseRequestPtr request_ptr = InsertRequest::Create(request, response);
BaseRequestPtr request_ptr = InsertRequest::Create(context_map_[context], request, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
@ -94,11 +180,10 @@ GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc:
GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request,
::milvus::grpc::TopKQueryResult* response) {
std::vector<std::string> file_id_array;
BaseRequestPtr request_ptr = SearchRequest::Create(request, file_id_array, response);
BaseRequestPtr request_ptr = SearchRequest::Create(context_map_[context], request, file_id_array, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
@ -111,22 +196,20 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext* context, const ::milvus
}
::milvus::grpc::SearchInFilesParam* request_mutable = const_cast<::milvus::grpc::SearchInFilesParam*>(request);
BaseRequestPtr request_ptr =
SearchRequest::Create(request_mutable->mutable_search_param(), file_id_array, response);
SearchRequest::Create(context_map_[context], request_mutable->mutable_search_param(), file_id_array, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::TableSchema* response) {
BaseRequestPtr request_ptr = DescribeTableRequest::Create(request->table_name(), response);
BaseRequestPtr request_ptr = DescribeTableRequest::Create(context_map_[context], request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
@ -134,23 +217,21 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, const ::milvus
GrpcRequestHandler::CountTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::TableRowCount* response) {
int64_t row_count = 0;
BaseRequestPtr request_ptr = CountTableRequest::Create(request->table_name(), row_count);
BaseRequestPtr request_ptr = CountTableRequest::Create(context_map_[context], request->table_name(), row_count);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_table_row_count(row_count);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, const ::milvus::grpc::Command* request,
::milvus::grpc::TableNameList* response) {
BaseRequestPtr request_ptr = ShowTablesRequest::Create(response);
BaseRequestPtr request_ptr = ShowTablesRequest::Create(context_map_[context], response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
@ -158,86 +239,76 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, const ::milvus::g
GrpcRequestHandler::Cmd(::grpc::ServerContext* context, const ::milvus::grpc::Command* request,
::milvus::grpc::StringReply* response) {
std::string result;
BaseRequestPtr request_ptr = CmdRequest::Create(request->cmd(), result);
BaseRequestPtr request_ptr = CmdRequest::Create(context_map_[context], request->cmd(), result);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_string_reply(result);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DeleteByDate(::grpc::ServerContext* context, const ::milvus::grpc::DeleteByDateParam* request,
::milvus::grpc::Status* response) {
BaseRequestPtr request_ptr = DeleteByDateRequest::Create(request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_error_code(grpc_status.error_code());
response->set_reason(grpc_status.reason());
BaseRequestPtr request_ptr = DeleteByDateRequest::Create(context_map_[context], request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
SET_TRACING_TAG(*response, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::PreloadTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseRequestPtr request_ptr = PreloadTableRequest::Create(request->table_name());
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
BaseRequestPtr request_ptr = PreloadTableRequest::Create(context_map_[context], request->table_name());
GrpcRequestScheduler::ExecRequest(request_ptr, response);
SET_TRACING_TAG(*response, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DescribeIndex(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::IndexParam* response) {
BaseRequestPtr request_ptr = DescribeIndexRequest::Create(request->table_name(), response);
BaseRequestPtr request_ptr = DescribeIndexRequest::Create(context_map_[context], request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DropIndex(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseRequestPtr request_ptr = DropIndexRequest::Create(request->table_name());
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
BaseRequestPtr request_ptr = DropIndexRequest::Create(context_map_[context], request->table_name());
GrpcRequestScheduler::ExecRequest(request_ptr, response);
SET_TRACING_TAG(*response, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::CreatePartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request,
::milvus::grpc::Status* response) {
BaseRequestPtr request_ptr = CreatePartitionRequest::Create(request);
BaseRequestPtr request_ptr = CreatePartitionRequest::Create(context_map_[context], request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
SET_TRACING_TAG(*response, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::ShowPartitions(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::PartitionList* response) {
BaseRequestPtr request_ptr = ShowPartitionsRequest::Create(request->table_name(), response);
BaseRequestPtr request_ptr = ShowPartitionsRequest::Create(context_map_[context], request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
SET_RESPONSE(response, grpc_status, context);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::DropPartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request,
::milvus::grpc::Status* response) {
BaseRequestPtr request_ptr = DropPartitionRequest::Create(request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
BaseRequestPtr request_ptr = DropPartitionRequest::Create(context_map_[context], request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
SET_TRACING_TAG(*response, context);
return ::grpc::Status::OK;
}

View File

@ -17,17 +17,55 @@
#pragma once
#include <server/context/Context.h>
#include <cstdint>
#include <memory>
#include <random>
#include <string>
#include <unordered_map>
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "opentracing/tracer.h"
#include "server/grpc_impl/interceptor/GrpcInterceptorHookHandler.h"
namespace milvus {
namespace server {
namespace grpc {
class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service {
#define SET_TRACING_TAG(GRPC_STATUS, SERVER_CONTEXT) \
if ((GRPC_STATUS).error_code() != ::milvus::grpc::ErrorCode::SUCCESS) { \
GetContext((SERVER_CONTEXT))->GetTraceContext()->GetSpan()->SetTag("error", true); \
GetContext((SERVER_CONTEXT))->GetTraceContext()->GetSpan()->SetTag("error_message", (GRPC_STATUS).reason()); \
}
#define SET_RESPONSE(RESPONSE, GRPC_STATUS, SERVER_CONTEXT) \
(RESPONSE)->mutable_status()->set_error_code((GRPC_STATUS).error_code()); \
(RESPONSE)->mutable_status()->set_reason((GRPC_STATUS).reason()); \
SET_TRACING_TAG(GRPC_STATUS, SERVER_CONTEXT)
class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service, public GrpcInterceptorHookHandler {
public:
explicit GrpcRequestHandler(const std::shared_ptr<opentracing::Tracer>& tracer);
void
OnPostRecvInitialMetaData(::grpc::experimental::ServerRpcInfo* server_rpc_info,
::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods) override;
void
OnPreSendMessage(::grpc::experimental::ServerRpcInfo* server_rpc_info,
::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods) override;
const std::shared_ptr<Context>&
GetContext(::grpc::ServerContext* server_context);
void
SetContext(::grpc::ServerContext* server_context, const std::shared_ptr<Context>& context);
uint64_t
random_id() const;
// *
// @brief This method is used to create table
//
@ -192,6 +230,15 @@ class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service {
::grpc::Status
PreloadTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) override;
private:
std::unordered_map<::grpc::ServerContext*, std::shared_ptr<Context>> context_map_;
std::shared_ptr<opentracing::Tracer> tracer_;
// std::unordered_map<::grpc::ServerContext*, std::unique_ptr<opentracing::Span>> span_map_;
mutable std::mt19937_64 random_num_generator_;
mutable std::mutex random_mutex_;
mutable std::mutex context_map_mutex_;
};
} // namespace grpc

View File

@ -16,11 +16,16 @@
// under the License.
#include "server/grpc_impl/GrpcServer.h"
#include "GrpcRequestHandler.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "server/Config.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include <grpc++/grpc++.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <chrono>
#include <iostream>
@ -28,13 +33,15 @@
#include <random>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include "GrpcRequestHandler.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "server/Config.h"
#include "server/DBWrapper.h"
#include "server/grpc_impl/interceptor/SpanInterceptor.h"
#include "utils/Log.h"
namespace milvus {
namespace server {
@ -95,11 +102,21 @@ GrpcServer::StartService() {
builder.SetDefaultCompressionAlgorithm(GRPC_COMPRESS_STREAM_GZIP);
builder.SetDefaultCompressionLevel(GRPC_COMPRESS_LEVEL_NONE);
GrpcRequestHandler service;
GrpcRequestHandler service(opentracing::Tracer::Global());
builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials());
builder.RegisterService(&service);
// Add gRPC interceptor
using InterceptorI = ::grpc::experimental::ServerInterceptorFactoryInterface;
using InterceptorIPtr = std::unique_ptr<InterceptorI>;
std::vector<InterceptorIPtr> creators;
creators.push_back(
std::unique_ptr<::grpc::experimental::ServerInterceptorFactoryInterface>(new SpanInterceptorFactory(&service)));
builder.experimental().SetInterceptorCreators(std::move(creators));
server_ptr_ = builder.BuildAndStart();
server_ptr_->Wait();

View File

@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/interceptor/GrpcInterceptorHookHandler.h"
namespace milvus {
namespace server {
namespace grpc {
void
GrpcInterceptorHookHandler::OnPostRecvInitialMetaData(
::grpc::experimental::ServerRpcInfo* server_rpc_info,
::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods) {
}
void
GrpcInterceptorHookHandler::OnPreSendMessage(::grpc::experimental::ServerRpcInfo* server_rpc_info,
::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods) {
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <grpcpp/impl/codegen/interceptor.h>
#include <grpcpp/impl/codegen/server_interceptor.h>
namespace milvus {
namespace server {
namespace grpc {
class GrpcInterceptorHookHandler {
public:
virtual void
OnPostRecvInitialMetaData(::grpc::experimental::ServerRpcInfo* server_rpc_info,
::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods);
virtual void
OnPreSendMessage(::grpc::experimental::ServerRpcInfo* server_rpc_info,
::grpc::experimental::InterceptorBatchMethods* interceptor_batch_methods);
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/interceptor/SpanInterceptor.h"
#include "tracing/TracerUtil.h"
namespace milvus {
namespace server {
namespace grpc {
SpanInterceptor::SpanInterceptor(::grpc::experimental::ServerRpcInfo* info, GrpcInterceptorHookHandler* hook_handler)
: info_(info), hook_handler_(hook_handler) {
}
void
SpanInterceptor::Intercept(::grpc::experimental::InterceptorBatchMethods* methods) {
if (methods->QueryInterceptionHookPoint(::grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
hook_handler_->OnPostRecvInitialMetaData(info_, methods);
} else if (methods->QueryInterceptionHookPoint(::grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
hook_handler_->OnPreSendMessage(info_, methods);
}
methods->Proceed();
}
::grpc::experimental::Interceptor*
SpanInterceptorFactory::CreateServerInterceptor(::grpc::experimental::ServerRpcInfo* info) {
return new SpanInterceptor(info, hook_handler_);
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <grpcpp/impl/codegen/interceptor.h>
#include <grpcpp/impl/codegen/server_interceptor.h>
#include <opentracing/tracer.h>
#include <memory>
#include "GrpcInterceptorHookHandler.h"
namespace milvus {
namespace server {
namespace grpc {
class SpanInterceptor : public ::grpc::experimental::Interceptor {
public:
SpanInterceptor(::grpc::experimental::ServerRpcInfo* info, GrpcInterceptorHookHandler* hook_handler);
void
Intercept(::grpc::experimental::InterceptorBatchMethods* methods) override;
private:
::grpc::experimental::ServerRpcInfo* info_;
GrpcInterceptorHookHandler* hook_handler_;
// std::shared_ptr<opentracing::Tracer> tracer_;
// std::unique_ptr<opentracing::Span> span_;
};
class SpanInterceptorFactory : public ::grpc::experimental::ServerInterceptorFactoryInterface {
public:
explicit SpanInterceptorFactory(GrpcInterceptorHookHandler* hook_handler) : hook_handler_(hook_handler) {
}
::grpc::experimental::Interceptor*
CreateServerInterceptor(::grpc::experimental::ServerRpcInfo* info) override;
private:
GrpcInterceptorHookHandler* hook_handler_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -26,13 +26,13 @@ namespace milvus {
namespace server {
namespace grpc {
CmdRequest::CmdRequest(const std::string& cmd, std::string& result)
: GrpcBaseRequest(INFO_REQUEST_GROUP), cmd_(cmd), result_(result) {
CmdRequest::CmdRequest(const std::shared_ptr<Context>& context, const std::string& cmd, std::string& result)
: GrpcBaseRequest(context, INFO_REQUEST_GROUP), cmd_(cmd), result_(result) {
}
BaseRequestPtr
CmdRequest::Create(const std::string& cmd, std::string& result) {
return std::shared_ptr<GrpcBaseRequest>(new CmdRequest(cmd, result));
CmdRequest::Create(const std::shared_ptr<Context>& context, const std::string& cmd, std::string& result) {
return std::shared_ptr<GrpcBaseRequest>(new CmdRequest(context, cmd, result));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,10 @@ namespace grpc {
class CmdRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& cmd, std::string& result);
Create(const std::shared_ptr<Context>& context, const std::string& cmd, std::string& result);
protected:
CmdRequest(const std::string& cmd, std::string& result);
CmdRequest(const std::shared_ptr<Context>& context, const std::string& cmd, std::string& result);
Status
OnExecute() override;

View File

@ -27,13 +27,14 @@ namespace milvus {
namespace server {
namespace grpc {
CountTableRequest::CountTableRequest(const std::string& table_name, int64_t& row_count)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), row_count_(row_count) {
CountTableRequest::CountTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name,
int64_t& row_count)
: GrpcBaseRequest(context, INFO_REQUEST_GROUP), table_name_(table_name), row_count_(row_count) {
}
BaseRequestPtr
CountTableRequest::Create(const std::string& table_name, int64_t& row_count) {
return std::shared_ptr<GrpcBaseRequest>(new CountTableRequest(table_name, row_count));
CountTableRequest::Create(const std::shared_ptr<Context>& context, const std::string& table_name, int64_t& row_count) {
return std::shared_ptr<GrpcBaseRequest>(new CountTableRequest(context, table_name, row_count));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,10 @@ namespace grpc {
class CountTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, int64_t& row_count);
Create(const std::shared_ptr<Context>& context, const std::string& table_name, int64_t& row_count);
protected:
CountTableRequest(const std::string& table_name, int64_t& row_count);
CountTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name, int64_t& row_count);
Status
OnExecute() override;

View File

@ -29,17 +29,18 @@ namespace milvus {
namespace server {
namespace grpc {
CreateIndexRequest::CreateIndexRequest(const ::milvus::grpc::IndexParam* index_param)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), index_param_(index_param) {
CreateIndexRequest::CreateIndexRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::IndexParam* index_param)
: GrpcBaseRequest(context, DDL_DML_REQUEST_GROUP), index_param_(index_param) {
}
BaseRequestPtr
CreateIndexRequest::Create(const ::milvus::grpc::IndexParam* index_param) {
CreateIndexRequest::Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::IndexParam* index_param) {
if (index_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new CreateIndexRequest(index_param));
return std::shared_ptr<GrpcBaseRequest>(new CreateIndexRequest(context, index_param));
}
Status

View File

@ -17,6 +17,8 @@
#pragma once
#include <memory>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
@ -26,10 +28,10 @@ namespace grpc {
class CreateIndexRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::IndexParam* index_param);
Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::IndexParam* index_param);
protected:
explicit CreateIndexRequest(const ::milvus::grpc::IndexParam* index_param);
explicit CreateIndexRequest(const std::shared_ptr<Context>& context, const ::milvus::grpc::IndexParam* index_param);
Status
OnExecute() override;

View File

@ -28,17 +28,19 @@ namespace milvus {
namespace server {
namespace grpc {
CreatePartitionRequest::CreatePartitionRequest(const ::milvus::grpc::PartitionParam* partition_param)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), partition_param_(partition_param) {
CreatePartitionRequest::CreatePartitionRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::PartitionParam* partition_param)
: GrpcBaseRequest(context, DDL_DML_REQUEST_GROUP), partition_param_(partition_param) {
}
BaseRequestPtr
CreatePartitionRequest::Create(const ::milvus::grpc::PartitionParam* partition_param) {
CreatePartitionRequest::Create(const std::shared_ptr<Context>& context,
const ::milvus::grpc::PartitionParam* partition_param) {
if (partition_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new CreatePartitionRequest(partition_param));
return std::shared_ptr<GrpcBaseRequest>(new CreatePartitionRequest(context, partition_param));
}
Status

View File

@ -17,6 +17,8 @@
#pragma once
#include <memory>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
@ -26,10 +28,11 @@ namespace grpc {
class CreatePartitionRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::PartitionParam* partition_param);
Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::PartitionParam* partition_param);
protected:
explicit CreatePartitionRequest(const ::milvus::grpc::PartitionParam* partition_param);
explicit CreatePartitionRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::PartitionParam* partition_param);
Status
OnExecute() override;

View File

@ -28,17 +28,18 @@ namespace milvus {
namespace server {
namespace grpc {
CreateTableRequest::CreateTableRequest(const ::milvus::grpc::TableSchema* schema)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), schema_(schema) {
CreateTableRequest::CreateTableRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::TableSchema* schema)
: GrpcBaseRequest(context, DDL_DML_REQUEST_GROUP), schema_(schema) {
}
BaseRequestPtr
CreateTableRequest::Create(const ::milvus::grpc::TableSchema* schema) {
CreateTableRequest::Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::TableSchema* schema) {
if (schema == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new CreateTableRequest(schema));
return std::shared_ptr<GrpcBaseRequest>(new CreateTableRequest(context, schema));
}
Status

View File

@ -17,6 +17,8 @@
#pragma once
#include <memory>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
@ -26,10 +28,10 @@ namespace grpc {
class CreateTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::TableSchema* schema);
Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::TableSchema* schema);
protected:
explicit CreateTableRequest(const ::milvus::grpc::TableSchema* schema);
explicit CreateTableRequest(const std::shared_ptr<Context>& context, const ::milvus::grpc::TableSchema* schema);
Status
OnExecute() override;

View File

@ -29,18 +29,20 @@ namespace milvus {
namespace server {
namespace grpc {
DeleteByDateRequest::DeleteByDateRequest(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), delete_by_range_param_(delete_by_range_param) {
DeleteByDateRequest::DeleteByDateRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::DeleteByDateParam* delete_by_range_param)
: GrpcBaseRequest(context, DDL_DML_REQUEST_GROUP), delete_by_range_param_(delete_by_range_param) {
}
BaseRequestPtr
DeleteByDateRequest::Create(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param) {
DeleteByDateRequest::Create(const std::shared_ptr<Context>& context,
const ::milvus::grpc::DeleteByDateParam* delete_by_range_param) {
if (delete_by_range_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new DeleteByDateRequest(delete_by_range_param));
return std::shared_ptr<GrpcBaseRequest>(new DeleteByDateRequest(context, delete_by_range_param));
}
Status

View File

@ -17,6 +17,8 @@
#pragma once
#include <memory>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
@ -26,10 +28,11 @@ namespace grpc {
class DeleteByDateRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
protected:
explicit DeleteByDateRequest(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
explicit DeleteByDateRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
Status
OnExecute() override;

View File

@ -27,13 +27,15 @@ namespace milvus {
namespace server {
namespace grpc {
DescribeIndexRequest::DescribeIndexRequest(const std::string& table_name, ::milvus::grpc::IndexParam* index_param)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), index_param_(index_param) {
DescribeIndexRequest::DescribeIndexRequest(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::IndexParam* index_param)
: GrpcBaseRequest(context, INFO_REQUEST_GROUP), table_name_(table_name), index_param_(index_param) {
}
BaseRequestPtr
DescribeIndexRequest::Create(const std::string& table_name, ::milvus::grpc::IndexParam* index_param) {
return std::shared_ptr<GrpcBaseRequest>(new DescribeIndexRequest(table_name, index_param));
DescribeIndexRequest::Create(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::IndexParam* index_param) {
return std::shared_ptr<GrpcBaseRequest>(new DescribeIndexRequest(context, table_name, index_param));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,12 @@ namespace grpc {
class DescribeIndexRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, ::milvus::grpc::IndexParam* index_param);
Create(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::IndexParam* index_param);
protected:
DescribeIndexRequest(const std::string& table_name, ::milvus::grpc::IndexParam* index_param);
DescribeIndexRequest(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::IndexParam* index_param);
Status
OnExecute() override;

View File

@ -27,13 +27,15 @@ namespace milvus {
namespace server {
namespace grpc {
DescribeTableRequest::DescribeTableRequest(const std::string& table_name, ::milvus::grpc::TableSchema* schema)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), schema_(schema) {
DescribeTableRequest::DescribeTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::TableSchema* schema)
: GrpcBaseRequest(context, INFO_REQUEST_GROUP), table_name_(table_name), schema_(schema) {
}
BaseRequestPtr
DescribeTableRequest::Create(const std::string& table_name, ::milvus::grpc::TableSchema* schema) {
return std::shared_ptr<GrpcBaseRequest>(new DescribeTableRequest(table_name, schema));
DescribeTableRequest::Create(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::TableSchema* schema) {
return std::shared_ptr<GrpcBaseRequest>(new DescribeTableRequest(context, table_name, schema));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,11 @@ namespace grpc {
class DescribeTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, ::milvus::grpc::TableSchema* schema);
Create(const std::shared_ptr<Context>& context, const std::string& table_name, ::milvus::grpc::TableSchema* schema);
protected:
DescribeTableRequest(const std::string& table_name, ::milvus::grpc::TableSchema* schema);
DescribeTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::TableSchema* schema);
Status
OnExecute() override;

View File

@ -27,13 +27,13 @@ namespace milvus {
namespace server {
namespace grpc {
DropIndexRequest::DropIndexRequest(const std::string& table_name)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), table_name_(table_name) {
DropIndexRequest::DropIndexRequest(const std::shared_ptr<Context>& context, const std::string& table_name)
: GrpcBaseRequest(context, DDL_DML_REQUEST_GROUP), table_name_(table_name) {
}
BaseRequestPtr
DropIndexRequest::Create(const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new DropIndexRequest(table_name));
DropIndexRequest::Create(const std::shared_ptr<Context>& context, const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new DropIndexRequest(context, table_name));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,10 @@ namespace grpc {
class DropIndexRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name);
Create(const std::shared_ptr<Context>& context, const std::string& table_name);
protected:
explicit DropIndexRequest(const std::string& table_name);
explicit DropIndexRequest(const std::shared_ptr<Context>& context, const std::string& table_name);
Status
OnExecute() override;

View File

@ -28,13 +28,15 @@ namespace milvus {
namespace server {
namespace grpc {
DropPartitionRequest::DropPartitionRequest(const ::milvus::grpc::PartitionParam* partition_param)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), partition_param_(partition_param) {
DropPartitionRequest::DropPartitionRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::PartitionParam* partition_param)
: GrpcBaseRequest(context, DDL_DML_REQUEST_GROUP), partition_param_(partition_param) {
}
BaseRequestPtr
DropPartitionRequest::Create(const ::milvus::grpc::PartitionParam* partition_param) {
return std::shared_ptr<GrpcBaseRequest>(new DropPartitionRequest(partition_param));
DropPartitionRequest::Create(const std::shared_ptr<Context>& context,
const ::milvus::grpc::PartitionParam* partition_param) {
return std::shared_ptr<GrpcBaseRequest>(new DropPartitionRequest(context, partition_param));
}
Status

View File

@ -17,6 +17,8 @@
#pragma once
#include <memory>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
@ -26,10 +28,11 @@ namespace grpc {
class DropPartitionRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::PartitionParam* partition_param);
Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::PartitionParam* partition_param);
protected:
explicit DropPartitionRequest(const ::milvus::grpc::PartitionParam* partition_param);
explicit DropPartitionRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::PartitionParam* partition_param);
Status
OnExecute() override;

View File

@ -28,13 +28,13 @@ namespace milvus {
namespace server {
namespace grpc {
DropTableRequest::DropTableRequest(const std::string& table_name)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), table_name_(table_name) {
DropTableRequest::DropTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name)
: GrpcBaseRequest(context, DDL_DML_REQUEST_GROUP), table_name_(table_name) {
}
BaseRequestPtr
DropTableRequest::Create(const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new DropTableRequest(table_name));
DropTableRequest::Create(const std::shared_ptr<Context>& context, const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new DropTableRequest(context, table_name));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,10 @@ namespace grpc {
class DropTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name);
Create(const std::shared_ptr<Context>& context, const std::string& table_name);
protected:
explicit DropTableRequest(const std::string& table_name);
explicit DropTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name);
Status
OnExecute() override;

View File

@ -59,8 +59,8 @@ ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array,
return Status::OK();
}
GrpcBaseRequest::GrpcBaseRequest(const std::string& request_group, bool async)
: request_group_(request_group), async_(async), done_(false) {
GrpcBaseRequest::GrpcBaseRequest(const std::shared_ptr<Context>& context, const std::string& request_group, bool async)
: context_(context), request_group_(request_group), async_(async), done_(false) {
}
GrpcBaseRequest::~GrpcBaseRequest() {

View File

@ -17,13 +17,14 @@
#pragma once
#include <condition_variable>
#include "db/meta/MetaTypes.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "grpc/gen-status/status.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "server/context/Context.h"
#include "utils/Status.h"
#include <condition_variable>
//#include <gperftools/profiler.h>
#include <memory>
#include <string>
@ -45,7 +46,8 @@ ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array,
class GrpcBaseRequest {
protected:
explicit GrpcBaseRequest(const std::string& request_group, bool async = false);
explicit GrpcBaseRequest(const std::shared_ptr<Context>& context, const std::string& request_group,
bool async = false);
virtual ~GrpcBaseRequest();
@ -85,6 +87,8 @@ class GrpcBaseRequest {
TableNotExistMsg(const std::string& table_name);
protected:
const std::shared_ptr<Context>& context_;
mutable std::mutex finish_mtx_;
std::condition_variable finish_cond_;

View File

@ -27,13 +27,14 @@ namespace milvus {
namespace server {
namespace grpc {
HasTableRequest::HasTableRequest(const std::string& table_name, bool& has_table)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), has_table_(has_table) {
HasTableRequest::HasTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name,
bool& has_table)
: GrpcBaseRequest(context, INFO_REQUEST_GROUP), table_name_(table_name), has_table_(has_table) {
}
BaseRequestPtr
HasTableRequest::Create(const std::string& table_name, bool& has_table) {
return std::shared_ptr<GrpcBaseRequest>(new HasTableRequest(table_name, has_table));
HasTableRequest::Create(const std::shared_ptr<Context>& context, const std::string& table_name, bool& has_table) {
return std::shared_ptr<GrpcBaseRequest>(new HasTableRequest(context, table_name, has_table));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,10 @@ namespace grpc {
class HasTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, bool& has_table);
Create(const std::shared_ptr<Context>& context, const std::string& table_name, bool& has_table);
protected:
HasTableRequest(const std::string& table_name, bool& has_table);
HasTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name, bool& has_table);
Status
OnExecute() override;

View File

@ -29,17 +29,19 @@ namespace milvus {
namespace server {
namespace grpc {
InsertRequest::InsertRequest(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), insert_param_(insert_param), record_ids_(record_ids) {
InsertRequest::InsertRequest(const std::shared_ptr<Context>& context, const ::milvus::grpc::InsertParam* insert_param,
::milvus::grpc::VectorIds* record_ids)
: GrpcBaseRequest(context, DDL_DML_REQUEST_GROUP), insert_param_(insert_param), record_ids_(record_ids) {
}
BaseRequestPtr
InsertRequest::Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids) {
InsertRequest::Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::InsertParam* insert_param,
::milvus::grpc::VectorIds* record_ids) {
if (insert_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new InsertRequest(insert_param, record_ids));
return std::shared_ptr<GrpcBaseRequest>(new InsertRequest(context, insert_param, record_ids));
}
Status

View File

@ -17,6 +17,8 @@
#pragma once
#include <memory>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
@ -26,10 +28,12 @@ namespace grpc {
class InsertRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids);
Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::InsertParam* insert_param,
::milvus::grpc::VectorIds* record_ids);
protected:
InsertRequest(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids);
InsertRequest(const std::shared_ptr<Context>& context, const ::milvus::grpc::InsertParam* insert_param,
::milvus::grpc::VectorIds* record_ids);
Status
OnExecute() override;

View File

@ -27,13 +27,13 @@ namespace milvus {
namespace server {
namespace grpc {
PreloadTableRequest::PreloadTableRequest(const std::string& table_name)
: GrpcBaseRequest(DQL_REQUEST_GROUP), table_name_(table_name) {
PreloadTableRequest::PreloadTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name)
: GrpcBaseRequest(context, DQL_REQUEST_GROUP), table_name_(table_name) {
}
BaseRequestPtr
PreloadTableRequest::Create(const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new PreloadTableRequest(table_name));
PreloadTableRequest::Create(const std::shared_ptr<Context>& context, const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new PreloadTableRequest(context, table_name));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,10 @@ namespace grpc {
class PreloadTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name);
Create(const std::shared_ptr<Context>& context, const std::string& table_name);
protected:
explicit PreloadTableRequest(const std::string& table_name);
explicit PreloadTableRequest(const std::shared_ptr<Context>& context, const std::string& table_name);
Status
OnExecute() override;

View File

@ -27,27 +27,30 @@ namespace milvus {
namespace server {
namespace grpc {
SearchRequest::SearchRequest(const ::milvus::grpc::SearchParam* search_vector_infos,
SearchRequest::SearchRequest(const std::shared_ptr<Context>& context,
const ::milvus::grpc::SearchParam* search_vector_infos,
const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResult* response)
: GrpcBaseRequest(DQL_REQUEST_GROUP),
: GrpcBaseRequest(context, DQL_REQUEST_GROUP),
search_param_(search_vector_infos),
file_id_array_(file_id_array),
topk_result_(response) {
}
BaseRequestPtr
SearchRequest::Create(const ::milvus::grpc::SearchParam* search_vector_infos,
SearchRequest::Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::SearchParam* search_vector_infos,
const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResult* response) {
if (search_vector_infos == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new SearchRequest(search_vector_infos, file_id_array, response));
return std::shared_ptr<GrpcBaseRequest>(new SearchRequest(context, search_vector_infos, file_id_array, response));
}
Status
SearchRequest::OnExecute() {
try {
auto pre_query_ctx = context_->Child("Pre query");
int64_t top_k = search_param_->topk();
int64_t nprobe = search_param_->nprobe();
@ -136,6 +139,8 @@ SearchRequest::OnExecute() {
ProfilerStart(fname.c_str());
#endif
pre_query_ctx->GetTraceContext()->GetSpan()->Finish();
if (file_id_array_.empty()) {
std::vector<std::string> partition_tags;
for (size_t i = 0; i < search_param_->partition_tag_array_size(); i++) {
@ -147,11 +152,11 @@ SearchRequest::OnExecute() {
return status;
}
status = DBWrapper::DB()->Query(table_name_, partition_tags, (size_t)top_k, record_count, nprobe,
status = DBWrapper::DB()->Query(context_, table_name_, partition_tags, (size_t)top_k, record_count, nprobe,
vec_f.data(), dates, result_ids, result_distances);
} else {
status = DBWrapper::DB()->QueryByFileID(table_name_, file_id_array_, (size_t)top_k, record_count, nprobe,
vec_f.data(), dates, result_ids, result_distances);
status = DBWrapper::DB()->QueryByFileID(context_, table_name_, file_id_array_, (size_t)top_k, record_count,
nprobe, vec_f.data(), dates, result_ids, result_distances);
}
#ifdef MILVUS_ENABLE_PROFILING
@ -167,6 +172,8 @@ SearchRequest::OnExecute() {
return Status::OK(); // empty table
}
auto post_query_ctx = context_->Child("Constructing result");
// step 7: construct result array
topk_result_->set_row_num(record_count);
topk_result_->mutable_ids()->Resize(static_cast<int>(result_ids.size()), -1);
@ -175,6 +182,8 @@ SearchRequest::OnExecute() {
memcpy(topk_result_->mutable_distances()->mutable_data(), result_distances.data(),
result_distances.size() * sizeof(float));
post_query_ctx->GetTraceContext()->GetSpan()->Finish();
// step 8: print time cost percent
rc.RecordSection("construct result and send");
rc.ElapseFromBegin("totally cost");

View File

@ -17,11 +17,13 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include <vector>
#include "server/context/Context.h"
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -29,12 +31,12 @@ namespace grpc {
class SearchRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::SearchParam* search_param, const std::vector<std::string>& file_id_array,
::milvus::grpc::TopKQueryResult* response);
Create(const std::shared_ptr<Context>& context, const ::milvus::grpc::SearchParam* search_param,
const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResult* response);
protected:
SearchRequest(const ::milvus::grpc::SearchParam* search_param, const std::vector<std::string>& file_id_array,
::milvus::grpc::TopKQueryResult* response);
SearchRequest(const std::shared_ptr<Context>& context, const ::milvus::grpc::SearchParam* search_param,
const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResult* response);
Status
OnExecute() override;

View File

@ -28,14 +28,15 @@ namespace milvus {
namespace server {
namespace grpc {
ShowPartitionsRequest::ShowPartitionsRequest(const std::string& table_name,
ShowPartitionsRequest::ShowPartitionsRequest(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::PartitionList* partition_list)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), partition_list_(partition_list) {
: GrpcBaseRequest(context, INFO_REQUEST_GROUP), table_name_(table_name), partition_list_(partition_list) {
}
BaseRequestPtr
ShowPartitionsRequest::Create(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list) {
return std::shared_ptr<GrpcBaseRequest>(new ShowPartitionsRequest(table_name, partition_list));
ShowPartitionsRequest::Create(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::PartitionList* partition_list) {
return std::shared_ptr<GrpcBaseRequest>(new ShowPartitionsRequest(context, table_name, partition_list));
}
Status

View File

@ -17,10 +17,11 @@
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <memory>
#include <string>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
@ -28,10 +29,12 @@ namespace grpc {
class ShowPartitionsRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list);
Create(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::PartitionList* partition_list);
protected:
ShowPartitionsRequest(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list);
ShowPartitionsRequest(const std::shared_ptr<Context>& context, const std::string& table_name,
::milvus::grpc::PartitionList* partition_list);
Status
OnExecute() override;

View File

@ -27,13 +27,14 @@ namespace milvus {
namespace server {
namespace grpc {
ShowTablesRequest::ShowTablesRequest(::milvus::grpc::TableNameList* table_name_list)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_list_(table_name_list) {
ShowTablesRequest::ShowTablesRequest(const std::shared_ptr<Context>& context,
::milvus::grpc::TableNameList* table_name_list)
: GrpcBaseRequest(context, INFO_REQUEST_GROUP), table_name_list_(table_name_list) {
}
BaseRequestPtr
ShowTablesRequest::Create(::milvus::grpc::TableNameList* table_name_list) {
return std::shared_ptr<GrpcBaseRequest>(new ShowTablesRequest(table_name_list));
ShowTablesRequest::Create(const std::shared_ptr<Context>& context, ::milvus::grpc::TableNameList* table_name_list) {
return std::shared_ptr<GrpcBaseRequest>(new ShowTablesRequest(context, table_name_list));
}
Status

View File

@ -17,6 +17,8 @@
#pragma once
#include <memory>
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
@ -26,10 +28,10 @@ namespace grpc {
class ShowTablesRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(::milvus::grpc::TableNameList* table_name_list);
Create(const std::shared_ptr<Context>& context, ::milvus::grpc::TableNameList* table_name_list);
protected:
explicit ShowTablesRequest(::milvus::grpc::TableNameList* table_name_list);
explicit ShowTablesRequest(const std::shared_ptr<Context>& context, ::milvus::grpc::TableNameList* table_name_list);
Status
OnExecute() override;

View File

@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "tracing/TextMapCarrier.h"
namespace milvus {
namespace tracing {
TextMapCarrier::TextMapCarrier(std::unordered_map<std::string, std::string>& text_map) : text_map_(text_map) {
}
opentracing::expected<void>
TextMapCarrier::Set(opentracing::string_view key, opentracing::string_view value) const {
// text_map_[key] = value;
// return {};
opentracing::expected<void> result;
auto was_successful = text_map_.emplace(key, value);
if (was_successful.second) {
// Use a default constructed opentracing::expected<void> to indicate
// success.
return result;
} else {
// `key` clashes with existing data, so the span context can't be encoded
// successfully; set opentracing::expected<void> to an std::error_code.
return opentracing::make_unexpected(std::make_error_code(std::errc::not_supported));
}
}
opentracing::expected<void>
TextMapCarrier::ForeachKey(F f) const {
// Iterate through all key-value pairs, the tracer will use the relevant keys
// to extract a span context.
for (auto& key_value : text_map_) {
auto was_successful = f(key_value.first, key_value.second);
if (!was_successful) {
// If the callback returns and unexpected value, bail out of the loop.
return was_successful;
}
}
// Indicate successful iteration.
return {};
}
// Optional, define TextMapReader::LookupKey to allow for faster extraction.
opentracing::expected<opentracing::string_view>
TextMapCarrier::LookupKey(opentracing::string_view key) const {
auto iter = text_map_.find(key);
if (iter != text_map_.end()) {
return opentracing::make_unexpected(opentracing::key_not_found_error);
}
return opentracing::string_view{iter->second};
}
} // namespace tracing
} // namespace milvus

View File

@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <opentracing/propagation.h>
#include <string>
#include <unordered_map>
namespace milvus {
namespace tracing {
class TextMapCarrier : public opentracing::TextMapReader, public opentracing::TextMapWriter {
public:
explicit TextMapCarrier(std::unordered_map<std::string, std::string>& text_map);
opentracing::expected<void>
Set(opentracing::string_view key, opentracing::string_view value) const override;
using F = std::function<opentracing::expected<void>(opentracing::string_view, opentracing::string_view)>;
opentracing::expected<void>
ForeachKey(F f) const override;
// Optional, define TextMapReader::LookupKey to allow for faster extraction.
opentracing::expected<opentracing::string_view>
LookupKey(opentracing::string_view key) const override;
private:
std::unordered_map<std::string, std::string>& text_map_;
};
} // namespace tracing
} // namespace milvus

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "tracing/TraceContext.h"
#include <utility>
namespace milvus {
namespace tracing {
TraceContext::TraceContext(std::unique_ptr<opentracing::Span>& span) : span_(std::move(span)) {
}
std::unique_ptr<TraceContext>
TraceContext::Child(const std::string& operation_name) const {
auto child_span = span_->tracer().StartSpan(operation_name, {opentracing::ChildOf(&(span_->context()))});
return std::make_unique<TraceContext>(child_span);
}
std::unique_ptr<TraceContext>
TraceContext::Follower(const std::string& operation_name) const {
auto follower_span = span_->tracer().StartSpan(operation_name, {opentracing::FollowsFrom(&(span_->context()))});
return std::make_unique<TraceContext>(follower_span);
}
const std::unique_ptr<opentracing::Span>&
TraceContext::GetSpan() const {
return span_;
}
} // namespace tracing
} // namespace milvus

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <opentracing/tracer.h>
#include <memory>
#include <string>
namespace milvus {
namespace tracing {
class TraceContext {
public:
explicit TraceContext(std::unique_ptr<opentracing::Span>& span);
std::unique_ptr<TraceContext>
Child(const std::string& operation_name) const;
std::unique_ptr<TraceContext>
Follower(const std::string& operation_name) const;
const std::unique_ptr<opentracing::Span>&
GetSpan() const;
private:
// std::unique_ptr<opentracing::SpanContext> span_context_;
std::unique_ptr<opentracing::Span> span_;
};
} // namespace tracing
} // namespace milvus

View File

@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "tracing/TracerUtil.h"
#include <opentracing/dynamic_load.h>
#include <opentracing/tracer.h>
#include <fstream>
#include <iostream>
#include "thirdparty/nlohmann/json.hpp"
namespace milvus {
namespace tracing {
const char* TracerUtil::tracer_context_header_name_;
void
TracerUtil::InitGlobal(const std::string& config_path) {
if (!config_path.empty()) {
LoadConfig(config_path);
} else {
tracer_context_header_name_ = "";
}
}
void
TracerUtil::LoadConfig(const std::string& config_path) {
// Parse JSON config
std::ifstream tracer_config(config_path);
if (!tracer_config.good()) {
std::cerr << "Failed to open tracer config file " << config_path << ": " << std::strerror(errno) << std::endl;
return;
}
using json = nlohmann::json;
json tracer_config_json;
tracer_config >> tracer_config_json;
std::string tracing_shared_lib = tracer_config_json[TRACER_LIBRARY_CONFIG_NAME];
std::string tracer_config_str = tracer_config_json[TRACER_CONFIGURATION_CONFIG_NAME].dump();
tracer_context_header_name_ = tracer_config_json[TRACE_CONTEXT_HEADER_CONFIG_NAME].dump().c_str();
// Load the tracer library.
std::string error_message;
auto handle_maybe = opentracing::DynamicallyLoadTracingLibrary(tracing_shared_lib.c_str(), error_message);
if (!handle_maybe) {
std::cerr << "Failed to load tracer library: " << error_message << std::endl;
return;
}
// Construct a tracer.
auto& tracer_factory = handle_maybe->tracer_factory();
auto tracer_maybe = tracer_factory.MakeTracer(tracer_config_str.c_str(), error_message);
if (!tracer_maybe) {
std::cerr << "Failed to create tracer: " << error_message << std::endl;
return;
}
auto& tracer = *tracer_maybe;
opentracing::Tracer::InitGlobal(tracer);
}
std::string
TracerUtil::GetTraceContextHeaderName() {
return tracer_context_header_name_;
}
} // namespace tracing
} // namespace milvus

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <string>
namespace milvus {
namespace tracing {
static const char* TRACER_LIBRARY_CONFIG_NAME = "tracer_library";
static const char* TRACER_CONFIGURATION_CONFIG_NAME = "tracer_configuration";
static const char* TRACE_CONTEXT_HEADER_CONFIG_NAME = "TraceContextHeaderName";
class TracerUtil {
public:
static void
InitGlobal(const std::string& config_path = "");
static std::string
GetTraceContextHeaderName();
private:
static void
LoadConfig(const std::string& config_path);
static const char* tracer_context_header_name_;
};
} // namespace tracing
} // namespace milvus

View File

@ -9,5 +9,6 @@ LIBUNWIND_VERSION=1.3.1
GPERFTOOLS_VERSION=2.7
GRPC_VERSION=master
ZLIB_VERSION=v1.2.11
OPENTRACING_VERSION=v1.5.1
# vim: set filetype=sh:

View File

@ -68,13 +68,17 @@ set(thirdparty_files
aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl/request grpc_request_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_impl_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl/interceptor grpc_interceptor_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/context server_context_files)
set(grpc_server_files
${grpc_request_files}
${grpc_impl_files}
${grpc_interceptor_files}
)
aux_source_directory(${MILVUS_ENGINE_SRC}/utils utils_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/tracing tracing_files)
set(entry_file
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp)
@ -101,6 +105,8 @@ set(common_files
${scheduler_files}
${wrapper_files}
${helper_files}
${server_context_files}
${tracing_files}
)
set(unittest_libs
@ -116,6 +122,8 @@ set(unittest_libs
pthread
metrics
gfortran
opentracing
opentracing_mocktracer
)
if (MILVUS_WITH_PROMETHEUS)
set(unittest_libs ${unittest_libs}

View File

@ -15,6 +15,12 @@
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <boost/filesystem.hpp>
#include <random>
#include <thread>
#include "cache/CpuCacheMgr.h"
#include "db/Constants.h"
#include "db/DB.h"
@ -25,11 +31,6 @@
#include "server/Config.h"
#include "utils/CommonUtil.h"
#include <gtest/gtest.h>
#include <boost/filesystem.hpp>
#include <random>
#include <thread>
namespace {
static const char* TABLE_NAME = "test_group";
@ -189,13 +190,13 @@ TEST_F(DBTest, DB_TEST) {
START_TIMER;
std::vector<std::string> tags;
stat = db_->Query(TABLE_NAME, tags, k, qb, 10, qxb.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, qb, 10, qxb.data(), result_ids, result_distances);
ss << "Search " << j << " With Size " << count / milvus::engine::M << " M";
STOP_TIMER(ss.str());
ASSERT_TRUE(stat.ok());
for (auto i = 0; i < qb; ++i) {
ASSERT_EQ(result_ids[i*k], target_ids[i]);
ASSERT_EQ(result_ids[i * k], target_ids[i]);
ss.str("");
ss << "Result [" << i << "]:";
for (auto t = 0; t < k; t++) {
@ -289,9 +290,9 @@ TEST_F(DBTest, SEARCH_TEST) {
std::vector<std::string> tags;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
stat = db_->Query(TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
}
@ -302,9 +303,9 @@ TEST_F(DBTest, SEARCH_TEST) {
std::vector<std::string> tags;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
stat = db_->Query(TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
}
@ -315,9 +316,9 @@ TEST_F(DBTest, SEARCH_TEST) {
std::vector<std::string> tags;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
stat = db_->Query(TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
}
@ -329,9 +330,9 @@ TEST_F(DBTest, SEARCH_TEST) {
std::vector<std::string> tags;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
stat = db_->Query(TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
}
#endif
@ -347,7 +348,8 @@ TEST_F(DBTest, SEARCH_TEST) {
}
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->QueryByFileID(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, result_ids, result_distances);
stat = db_->QueryByFileID(dummy_context_, TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, result_ids,
result_distances);
ASSERT_TRUE(stat.ok());
}
@ -358,9 +360,9 @@ TEST_F(DBTest, SEARCH_TEST) {
std::vector<std::string> tags;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
stat = db_->Query(TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, 1100, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
}
@ -375,14 +377,14 @@ TEST_F(DBTest, SEARCH_TEST) {
{
result_ids.clear();
result_dists.clear();
stat = db_->Query(TABLE_NAME, partition_tag, k, nq, 10, xq.data(), result_ids, result_dists);
stat = db_->Query(dummy_context_, TABLE_NAME, partition_tag, k, nq, 10, xq.data(), result_ids, result_dists);
ASSERT_TRUE(stat.ok());
}
{
result_ids.clear();
result_dists.clear();
stat = db_->Query(TABLE_NAME, partition_tag, k, 200, 10, xq.data(), result_ids, result_dists);
stat = db_->Query(dummy_context_, TABLE_NAME, partition_tag, k, 200, 10, xq.data(), result_ids, result_dists);
ASSERT_TRUE(stat.ok());
}
@ -397,7 +399,8 @@ TEST_F(DBTest, SEARCH_TEST) {
}
result_ids.clear();
result_dists.clear();
stat = db_->QueryByFileID(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, result_ids, result_dists);
stat = db_->QueryByFileID(dummy_context_, TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, result_ids,
result_dists);
ASSERT_TRUE(stat.ok());
}
@ -472,10 +475,12 @@ TEST_F(DBTest, SHUTDOWN_TEST) {
milvus::engine::meta::DatesT dates;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(table_info.table_id_, tags, 1, 1, 1, nullptr, dates, result_ids, result_distances);
stat =
db_->Query(dummy_context_, table_info.table_id_, tags, 1, 1, 1, nullptr, dates, result_ids, result_distances);
ASSERT_FALSE(stat.ok());
std::vector<std::string> file_ids;
stat = db_->QueryByFileID(table_info.table_id_, file_ids, 1, 1, 1, nullptr, dates, result_ids, result_distances);
stat = db_->QueryByFileID(dummy_context_, table_info.table_id_, file_ids, 1, 1, 1, nullptr, dates, result_ids,
result_distances);
ASSERT_FALSE(stat.ok());
stat = db_->DropTable(table_info.table_id_, dates);
@ -550,14 +555,14 @@ TEST_F(DBTest, PARTITION_TEST) {
milvus::engine::IDNumbers vector_ids;
vector_ids.resize(INSERT_BATCH);
for (int64_t k = 0; k < INSERT_BATCH; k++) {
vector_ids[k] = i*INSERT_BATCH + k;
vector_ids[k] = i * INSERT_BATCH + k;
}
db_->InsertVectors(table_name, partition_tag, INSERT_BATCH, xb.data(), vector_ids);
ASSERT_EQ(vector_ids.size(), INSERT_BATCH);
}
//duplicated partition is not allowed
// duplicated partition is not allowed
stat = db_->CreatePartition(table_name, "", "0");
ASSERT_FALSE(stat.ok());
@ -569,20 +574,20 @@ TEST_F(DBTest, PARTITION_TEST) {
ASSERT_EQ(partition_schema_array[i].table_id_, table_name + "_" + std::to_string(i));
}
{ // build index
{ // build index
milvus::engine::TableIndex index;
index.engine_type_ = (int) milvus::engine::EngineType::FAISS_IVFFLAT;
index.metric_type_ = (int) milvus::engine::MetricType::L2;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
index.metric_type_ = (int)milvus::engine::MetricType::L2;
stat = db_->CreateIndex(table_info.table_id_, index);
ASSERT_TRUE(stat.ok());
uint64_t row_count = 0;
stat = db_->GetTableRowCount(TABLE_NAME, row_count);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(row_count, INSERT_BATCH*PARTITION_COUNT);
ASSERT_EQ(row_count, INSERT_BATCH * PARTITION_COUNT);
}
{ // search
{ // search
const int64_t nq = 5;
const int64_t topk = 10;
const int64_t nprobe = 10;
@ -593,25 +598,25 @@ TEST_F(DBTest, PARTITION_TEST) {
std::vector<std::string> tags = {"0", std::to_string(PARTITION_COUNT - 1)};
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(TABLE_NAME, tags, topk, nq, nprobe, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, topk, nq, nprobe, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(result_ids.size()/topk, nq);
ASSERT_EQ(result_ids.size() / topk, nq);
// search in whole table
tags.clear();
result_ids.clear();
result_distances.clear();
stat = db_->Query(TABLE_NAME, tags, topk, nq, nprobe, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, topk, nq, nprobe, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(result_ids.size()/topk, nq);
ASSERT_EQ(result_ids.size() / topk, nq);
// search in all partitions(tag regex match)
tags.push_back("\\d");
result_ids.clear();
result_distances.clear();
stat = db_->Query(TABLE_NAME, tags, topk, nq, nprobe, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, topk, nq, nprobe, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(result_ids.size()/topk, nq);
ASSERT_EQ(result_ids.size() / topk, nq);
}
stat = db_->DropPartition(table_name + "_0");

View File

@ -15,18 +15,18 @@
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <boost/filesystem.hpp>
#include <random>
#include <thread>
#include "db/Constants.h"
#include "db/DB.h"
#include "db/DBImpl.h"
#include "db/meta/MetaConsts.h"
#include "db/utils.h"
#include <gtest/gtest.h>
#include <boost/filesystem.hpp>
#include <random>
#include <thread>
namespace {
static const char* TABLE_NAME = "test_group";
@ -98,14 +98,14 @@ TEST_F(MySqlDBTest, DB_TEST) {
START_TIMER;
std::vector<std::string> tags;
stat = db_->Query(TABLE_NAME, tags, k, qb, 10, qxb.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, qb, 10, qxb.data(), result_ids, result_distances);
ss << "Search " << j << " With Size " << count / milvus::engine::M << " M";
STOP_TIMER(ss.str());
ASSERT_TRUE(stat.ok());
for (auto i = 0; i < qb; ++i) {
// std::cout << results[k][0].first << " " << target_ids[k] << std::endl;
// ASSERT_EQ(results[k][0].first, target_ids[k]);
// std::cout << results[k][0].first << " " << target_ids[k] << std::endl;
// ASSERT_EQ(results[k][0].first, target_ids[k]);
bool exists = false;
for (auto t = 0; t < k; t++) {
if (result_ids[i * k + t] == target_ids[i]) {
@ -130,12 +130,12 @@ TEST_F(MySqlDBTest, DB_TEST) {
int loop = INSERT_LOOP;
for (auto i = 0; i < loop; ++i) {
// if (i==10) {
// db_->InsertVectors(TABLE_NAME, "", qb, qxb.data(), target_ids);
// ASSERT_EQ(target_ids.size(), qb);
// } else {
// db_->InsertVectors(TABLE_NAME, "", nb, xb.data(), vector_ids);
// }
// if (i==10) {
// db_->InsertVectors(TABLE_NAME, "", qb, qxb.data(), target_ids);
// ASSERT_EQ(target_ids.size(), qb);
// } else {
// db_->InsertVectors(TABLE_NAME, "", nb, xb.data(), vector_ids);
// }
db_->InsertVectors(TABLE_NAME, "", nb, xb.data(), vector_ids);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
@ -184,7 +184,9 @@ TEST_F(MySqlDBTest, SEARCH_TEST) {
const int batch_size = 100;
for (int j = 0; j < nb / batch_size; ++j) {
stat = db_->InsertVectors(TABLE_NAME, "", batch_size, xb.data() + batch_size * j * TABLE_DIM, ids);
if (j == 200) { sleep(1); }
if (j == 200) {
sleep(1);
}
ASSERT_TRUE(stat.ok());
}
@ -193,7 +195,7 @@ TEST_F(MySqlDBTest, SEARCH_TEST) {
std::vector<std::string> tags;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, k, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
}
@ -271,15 +273,15 @@ TEST_F(MySqlDBTest, DELETE_TEST) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
// std::vector<engine::meta::DateT> dates;
// stat = db_->DropTable(TABLE_NAME, dates);
//// std::cout << "5 sec start" << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(5));
//// std::cout << "5 sec finish" << std::endl;
// ASSERT_TRUE(stat.ok());
//
// db_->HasTable(TABLE_NAME, has_table);
// ASSERT_FALSE(has_table);
// std::vector<engine::meta::DateT> dates;
// stat = db_->DropTable(TABLE_NAME, dates);
//// std::cout << "5 sec start" << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(5));
//// std::cout << "5 sec finish" << std::endl;
// ASSERT_TRUE(stat.ok());
//
// db_->HasTable(TABLE_NAME, has_table);
// ASSERT_FALSE(has_table);
}
TEST_F(MySqlDBTest, PARTITION_TEST) {
@ -305,21 +307,20 @@ TEST_F(MySqlDBTest, PARTITION_TEST) {
stat = db_->CreatePartition(table_name, partition_name, partition_tag);
ASSERT_FALSE(stat.ok());
std::vector<float> xb;
BuildVectors(INSERT_BATCH, xb);
milvus::engine::IDNumbers vector_ids;
vector_ids.resize(INSERT_BATCH);
for (int64_t k = 0; k < INSERT_BATCH; k++) {
vector_ids[k] = i*INSERT_BATCH + k;
vector_ids[k] = i * INSERT_BATCH + k;
}
db_->InsertVectors(table_name, partition_tag, INSERT_BATCH, xb.data(), vector_ids);
ASSERT_EQ(vector_ids.size(), INSERT_BATCH);
}
//duplicated partition is not allowed
// duplicated partition is not allowed
stat = db_->CreatePartition(table_name, "", "0");
ASSERT_FALSE(stat.ok());
@ -331,20 +332,20 @@ TEST_F(MySqlDBTest, PARTITION_TEST) {
ASSERT_EQ(partition_schema_array[i].table_id_, table_name + "_" + std::to_string(i));
}
{ // build index
{ // build index
milvus::engine::TableIndex index;
index.engine_type_ = (int) milvus::engine::EngineType::FAISS_IVFFLAT;
index.metric_type_ = (int) milvus::engine::MetricType::L2;
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT;
index.metric_type_ = (int)milvus::engine::MetricType::L2;
stat = db_->CreateIndex(table_info.table_id_, index);
ASSERT_TRUE(stat.ok());
uint64_t row_count = 0;
stat = db_->GetTableRowCount(TABLE_NAME, row_count);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(row_count, INSERT_BATCH*PARTITION_COUNT);
ASSERT_EQ(row_count, INSERT_BATCH * PARTITION_COUNT);
}
{ // search
{ // search
const int64_t nq = 5;
const int64_t topk = 10;
const int64_t nprobe = 10;
@ -355,25 +356,25 @@ TEST_F(MySqlDBTest, PARTITION_TEST) {
std::vector<std::string> tags = {"0", std::to_string(PARTITION_COUNT - 1)};
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(TABLE_NAME, tags, 10, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, 10, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(result_ids.size()/topk, nq);
ASSERT_EQ(result_ids.size() / topk, nq);
// search in whole table
tags.clear();
result_ids.clear();
result_distances.clear();
stat = db_->Query(TABLE_NAME, tags, 10, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, 10, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(result_ids.size()/topk, nq);
ASSERT_EQ(result_ids.size() / topk, nq);
// search in all partitions(tag regex match)
tags.push_back("\\d");
result_ids.clear();
result_distances.clear();
stat = db_->Query(TABLE_NAME, tags, 10, nq, 10, xq.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, TABLE_NAME, tags, 10, nq, 10, xq.data(), result_ids, result_distances);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(result_ids.size()/topk, nq);
ASSERT_EQ(result_ids.size() / topk, nq);
}
stat = db_->DropPartition(table_name + "_0");

View File

@ -15,7 +15,13 @@
// specific language governing permissions and limitations
// under the License.
#include "gtest/gtest.h"
#include <boost/filesystem.hpp>
#include <chrono>
#include <cmath>
#include <fstream>
#include <iostream>
#include <random>
#include <thread>
#include "db/Constants.h"
#include "db/engine/EngineFactory.h"
@ -24,16 +30,9 @@
#include "db/insert/VectorSource.h"
#include "db/meta/MetaConsts.h"
#include "db/utils.h"
#include "gtest/gtest.h"
#include "metrics/Metrics.h"
#include <boost/filesystem.hpp>
#include <chrono>
#include <cmath>
#include <fstream>
#include <iostream>
#include <random>
#include <thread>
namespace {
static constexpr int64_t TABLE_DIM = 256;
@ -258,7 +257,8 @@ TEST_F(MemManagerTest2, SERIAL_INSERT_SEARCH_TEST) {
std::vector<std::string> tags;
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
stat = db_->Query(GetTableName(), tags, topk, 1, nprobe, search.data(), result_ids, result_distances);
stat = db_->Query(dummy_context_, GetTableName(), tags, topk, 1, nprobe, search.data(), result_ids,
result_distances);
ASSERT_EQ(result_ids[0], pair.first);
ASSERT_LT(result_distances[0], 1e-4);
}
@ -330,7 +330,8 @@ TEST_F(MemManagerTest2, CONCURRENT_INSERT_SEARCH_TEST) {
START_TIMER;
std::vector<std::string> tags;
stat = db_->Query(GetTableName(), tags, k, qb, 10, qxb.data(), result_ids, result_distances);
stat =
db_->Query(dummy_context_, GetTableName(), tags, k, qb, 10, qxb.data(), result_ids, result_distances);
ss << "Search " << j << " With Size " << count / milvus::engine::M << " M";
STOP_TIMER(ss.str());

View File

@ -15,17 +15,21 @@
// specific language governing permissions and limitations
// under the License.
#include "db/utils.h"
#include <opentracing/mocktracer/tracer.h>
#include <boost/filesystem.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
#include "db/DBFactory.h"
#include "db/Options.h"
#include "db/utils.h"
#ifdef MILVUS_GPU_VERSION
#include "knowhere/index/vector_index/helpers/FaissGpuResourceMgr.h"
#endif
@ -123,6 +127,13 @@ BaseTest::InitLog() {
void
BaseTest::SetUp() {
InitLog();
dummy_context_ = std::make_shared<milvus::server::Context>("dummy_request_id");
opentracing::mocktracer::MockTracerOptions tracer_options;
auto mock_tracer =
std::shared_ptr<opentracing::Tracer>{new opentracing::mocktracer::MockTracer{std::move(tracer_options)}};
auto mock_span = mock_tracer->StartSpan("mock_span");
auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
dummy_context_->SetTraceContext(trace_context);
#ifdef MILVUS_GPU_VERSION
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(0, 1024 * 1024 * 200, 1024 * 1024 * 300, 2);
#endif

View File

@ -58,6 +58,8 @@ class BaseTest : public ::testing::Test {
TearDown() override;
virtual milvus::engine::DBOptions
GetOptions();
std::shared_ptr<milvus::server::Context> dummy_context_;
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -16,6 +16,7 @@
// under the License.
#include <gtest/gtest.h>
#include "scheduler/ResourceFactory.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/DiskResource.h"
@ -158,7 +159,7 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<SpecResLabel>(disk_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
auto task = std::make_shared<TestTask>(std::make_shared<server::Context>("dummy_request_id"), dummy, label);
std::vector<std::string> path{disk_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
@ -186,7 +187,7 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<SpecResLabel>(cpu_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
auto task = std::make_shared<TestTask>(std::make_shared<server::Context>("dummy_request_id"), dummy, label);
std::vector<std::string> path{cpu_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
@ -214,7 +215,7 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<SpecResLabel>(gpu_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
auto task = std::make_shared<TestTask>(std::make_shared<server::Context>("dummy_request_id"), dummy, label);
std::vector<std::string> path{gpu_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);
@ -242,7 +243,7 @@ TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto label = std::make_shared<SpecResLabel>(test_resource_);
auto task = std::make_shared<TestTask>(dummy, label);
auto task = std::make_shared<TestTask>(std::make_shared<server::Context>("dummy_request_id"), dummy, label);
std::vector<std::string> path{test_resource_->name()};
task->path() = Path(path, 0);
tasks.push_back(task);

View File

@ -16,6 +16,7 @@
// under the License.
#include <gtest/gtest.h>
#include "scheduler/ResourceMgr.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/DiskResource.h"
@ -186,7 +187,8 @@ TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) {
auto callback = [&](EventPtr event) { flag = true; };
mgr1_->RegisterSubscriber(callback);
TableFileSchemaPtr dummy = nullptr;
disk_res->task_table().Put(std::make_shared<TestTask>(dummy, nullptr));
disk_res->task_table().Put(
std::make_shared<TestTask>(std::make_shared<server::Context>("dummy_request_id"), dummy, nullptr));
sleep(1);
ASSERT_TRUE(flag);
}

View File

@ -16,6 +16,8 @@
// under the License.
#include <gtest/gtest.h>
#include <opentracing/mocktracer/tracer.h>
#include "scheduler/task/BuildIndexTask.h"
#include "scheduler/task/SearchTask.h"
@ -23,7 +25,17 @@ namespace milvus {
namespace scheduler {
TEST(TaskTest, INVALID_INDEX) {
auto search_task = std::make_shared<XSearchTask>(nullptr, nullptr);
auto dummy_context = std::make_shared<milvus::server::Context>("dummy_request_id");
opentracing::mocktracer::MockTracerOptions tracer_options;
auto mock_tracer =
std::shared_ptr<opentracing::Tracer>{new opentracing::mocktracer::MockTracer{std::move(tracer_options)}};
auto mock_span = mock_tracer->StartSpan("mock_span");
auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
dummy_context->SetTraceContext(trace_context);
TableFileSchemaPtr dummy_file = std::make_shared<engine::meta::TableFileSchema>();
auto search_task =
std::make_shared<XSearchTask>(dummy_context, dummy_file, nullptr);
search_task->Load(LoadType::TEST, 10);
auto build_task = std::make_shared<XBuildIndexTask>(nullptr, nullptr);

View File

@ -16,6 +16,7 @@
// under the License.
#include <gtest/gtest.h>
#include "scheduler/TaskTable.h"
#include "scheduler/task/TestTask.h"
@ -161,8 +162,10 @@ class TaskTableBaseTest : public ::testing::Test {
SetUp() override {
milvus::scheduler::TableFileSchemaPtr dummy = nullptr;
invalid_task_ = nullptr;
task1_ = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
task2_ = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
task1_ = std::make_shared<milvus::scheduler::TestTask>(
std::make_shared<milvus::server::Context>("dummy_request_id"), dummy, nullptr);
task2_ = std::make_shared<milvus::scheduler::TestTask>(
std::make_shared<milvus::server::Context>("dummy_request_id"), dummy, nullptr);
}
milvus::scheduler::TaskPtr invalid_task_;
@ -318,7 +321,8 @@ class TaskTableAdvanceTest : public ::testing::Test {
SetUp() override {
milvus::scheduler::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<milvus::scheduler::TestTask>(dummy, nullptr);
auto task = std::make_shared<milvus::scheduler::TestTask>(
std::make_shared<milvus::server::Context>("dummy_request_id"), dummy, nullptr);
table1_.Put(task);
}

View File

@ -54,7 +54,7 @@ set(server_test_files
add_executable(test_server ${server_test_files})
set(client_grpc_lib
set(grpc_lib
grpcpp_channelz
grpc++
grpc
@ -64,7 +64,7 @@ set(client_grpc_lib
target_link_libraries(test_server
knowhere
stdc++
${client_grpc_lib}
${grpc_lib}
${unittest_libs}
)

View File

@ -16,22 +16,22 @@
// under the License.
#include <gtest/gtest.h>
#include <opentracing/mocktracer/tracer.h>
#include <boost/filesystem.hpp>
#include <thread>
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "scheduler/ResourceFactory.h"
#include "scheduler/SchedInst.h"
#include "server/Config.h"
#include "server/DBWrapper.h"
#include "server/Server.h"
#include "server/grpc_impl/GrpcRequestHandler.h"
#include "server/grpc_impl/GrpcRequestScheduler.h"
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include "src/version.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "scheduler/ResourceFactory.h"
#include "scheduler/SchedInst.h"
#include "server/Config.h"
#include "server/DBWrapper.h"
#include "utils/CommonUtil.h"
namespace {
@ -43,10 +43,11 @@ static constexpr int64_t VECTOR_COUNT = 1000;
static constexpr int64_t INSERT_LOOP = 10;
constexpr int64_t SECONDS_EACH_HOUR = 3600;
void CopyRowRecord(::milvus::grpc::RowRecord* target, const std::vector<float>& src) {
void
CopyRowRecord(::milvus::grpc::RowRecord* target, const std::vector<float>& src) {
auto vector_data = target->mutable_vector_data();
vector_data->Resize(static_cast<int>(src.size()), 0.0);
memcpy(vector_data->mutable_data(), src.data(), src.size()* sizeof(float));
memcpy(vector_data->mutable_data(), src.data(), src.size() * sizeof(float));
}
class RpcHandlerTest : public testing::Test {
@ -88,14 +89,23 @@ class RpcHandlerTest : public testing::Test {
milvus::server::DBWrapper::GetInstance().StartService();
// initialize handler, create table
handler = std::make_shared<milvus::server::grpc::GrpcRequestHandler>();
handler = std::make_shared<milvus::server::grpc::GrpcRequestHandler>(opentracing::Tracer::Global());
dummy_context = std::make_shared<milvus::server::Context>("dummy_request_id");
opentracing::mocktracer::MockTracerOptions tracer_options;
auto mock_tracer =
std::shared_ptr<opentracing::Tracer>{new opentracing::mocktracer::MockTracer{std::move(tracer_options)}};
auto mock_span = mock_tracer->StartSpan("mock_span");
auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
dummy_context->SetTraceContext(trace_context);
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::TableSchema request;
::milvus::grpc::Status status;
request.set_table_name(TABLE_NAME);
request.set_dimension(TABLE_DIM);
request.set_index_file_size(INDEX_FILE_SIZE);
request.set_metric_type(1);
handler->SetContext(&context, dummy_context);
::grpc::Status grpc_status = handler->CreateTable(&context, &request, &status);
}
@ -110,6 +120,7 @@ class RpcHandlerTest : public testing::Test {
protected:
std::shared_ptr<milvus::server::grpc::GrpcRequestHandler> handler;
std::shared_ptr<milvus::server::Context> dummy_context;
};
void
@ -149,6 +160,7 @@ CurrentTmDate(int64_t offset_day = 0) {
TEST_F(RpcHandlerTest, HAS_TABLE_TEST) {
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::TableName request;
::milvus::grpc::BoolReply reply;
::grpc::Status status = handler->HasTable(&context, &request, &reply);
@ -161,6 +173,7 @@ TEST_F(RpcHandlerTest, HAS_TABLE_TEST) {
TEST_F(RpcHandlerTest, INDEX_TEST) {
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::IndexParam request;
::milvus::grpc::Status response;
::grpc::Status grpc_status = handler->CreateIndex(&context, &request, &response);
@ -197,6 +210,7 @@ TEST_F(RpcHandlerTest, INDEX_TEST) {
TEST_F(RpcHandlerTest, INSERT_TEST) {
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::InsertParam request;
::milvus::grpc::Status response;
@ -214,9 +228,10 @@ TEST_F(RpcHandlerTest, INSERT_TEST) {
TEST_F(RpcHandlerTest, SEARCH_TEST) {
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::SearchParam request;
::milvus::grpc::TopKQueryResult response;
//test null input
// test null input
handler->Search(&context, nullptr, &response);
// test invalid table name
@ -277,6 +292,7 @@ TEST_F(RpcHandlerTest, SEARCH_TEST) {
TEST_F(RpcHandlerTest, TABLES_TEST) {
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::TableSchema tableschema;
::milvus::grpc::Status response;
std::string tablename = "tbl";
@ -313,7 +329,7 @@ TEST_F(RpcHandlerTest, TABLES_TEST) {
std::vector<std::vector<float>> record_array;
BuildVectors(0, VECTOR_COUNT, record_array);
::milvus::grpc::VectorIds vector_ids;
for (int64_t i = 0; i < VECTOR_COUNT; i++) {
for (int64_t i = 0; i < VECTOR_COUNT; i++) {
vector_ids.add_vector_id_array(i);
}
// Insert vectors
@ -379,6 +395,7 @@ TEST_F(RpcHandlerTest, TABLES_TEST) {
TEST_F(RpcHandlerTest, PARTITION_TEST) {
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::TableSchema table_schema;
::milvus::grpc::Status response;
std::string str_table_name = "tbl_partition";
@ -417,6 +434,7 @@ TEST_F(RpcHandlerTest, PARTITION_TEST) {
TEST_F(RpcHandlerTest, CMD_TEST) {
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::Command command;
command.set_cmd("version");
::milvus::grpc::StringReply reply;
@ -431,6 +449,7 @@ TEST_F(RpcHandlerTest, CMD_TEST) {
TEST_F(RpcHandlerTest, DELETE_BY_RANGE_TEST) {
::grpc::ServerContext context;
handler->SetContext(&context, dummy_context);
::milvus::grpc::DeleteByDateParam request;
::milvus::grpc::Status status;
handler->DeleteByDate(&context, nullptr, &status);
@ -468,7 +487,8 @@ class DummyRequest : public milvus::server::grpc::GrpcBaseRequest {
}
public:
explicit DummyRequest(std::string& dummy) : GrpcBaseRequest(dummy) {
explicit DummyRequest(std::string& dummy)
: GrpcBaseRequest(std::make_shared<milvus::server::Context>("dummy_request_id"), dummy) {
}
};