merge from remote source

Former-commit-id: fc7be69bc0d039576469f0adb81e027a8f00272c
This commit is contained in:
groot 2019-05-29 09:15:12 +08:00
commit 61fb59d23b
24 changed files with 1173 additions and 214 deletions

View File

@ -11,7 +11,7 @@ Please mark all change in change log and use the ticket from JIRA.
### New Feature
- MS-5 - Implement Auto Archive Feature
- MS-16 - Implement metrics without prometheus
### Task
- MS-1 - Add CHANGELOG.md

View File

@ -4,32 +4,10 @@
# Proprietary and confidential.
#-------------------------------------------------------------------------------
cmake_minimum_required(VERSION 3.14)
message(STATUS "Building using CMake version: ${CMAKE_VERSION}")
cmake_minimum_required(VERSION 3.12)
set(MEGASEARCH_VERSION "0.1.0")
string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" MEGASEARCH_BASE_VERSION "${MEGASEARCH_VERSION}")
project(megasearch VERSION "${MEGASEARCH_BASE_VERSION}")
project(vecwise_engine LANGUAGES CUDA CXX)
set(MEGASEARCH_VERSION_MAJOR "${megasearch_VERSION_MAJOR}")
set(MEGASEARCH_VERSION_MINOR "${megasearch_VERSION_MINOR}")
set(MEGASEARCH_VERSION_PATCH "${megasearch_VERSION_PATCH}")
if(MEGASEARCH_VERSION_MAJOR STREQUAL ""
OR MEGASEARCH_VERSION_MINOR STREQUAL ""
OR MEGASEARCH_VERSION_PATCH STREQUAL "")
message(FATAL_ERROR "Failed to determine MegaSearch version from '${MEGASEARCH_VERSION}'")
endif()
message(STATUS "MegaSearch version: "
"${MEGASEARCH_VERSION_MAJOR}.${MEGASEARCH_VERSION_MINOR}.${MEGASEARCH_VERSION_PATCH} "
"(full: '${MEGASEARCH_VERSION}')")
set(MEGASEARCH_SOURCE_DIR ${PROJECT_SOURCE_DIR})
set(MEGASEARCH_BINARY_DIR ${PROJECT_BINARY_DIR})
find_package(CUDA)
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -Xcompiler -fPIC -std=c++11 -D_FORCE_INLINES -arch sm_60 --expt-extended-lambda")
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O0 -g")
@ -54,31 +32,28 @@ else()
set(VECWISE_BUILD_ARCH unknown)
endif()
if(DEFINED UNIX)
message("building vecwise on Unix")
set(VECWISE_BUILD_SYSTEM macos)
elseif(DEFINED APPLE)
message("building vecwise on MacOS")
set(VECWISE_BUILD_SYSTEM unix)
else()
message("unknown OS")
set(VECWISE_BUILD_SYSTEM unknown)
endif ()
if(CMAKE_BUILD_TYPE STREQUAL "Release")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -fPIC -DELPP_THREAD_SAFE")
if (GPU_VERSION STREQUAL "ON")
set(ENABLE_LICENSE "ON")
add_definitions("-DENABLE_LICENSE")
endif ()
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0 -g -fPIC -DELPP_THREAD_SAFE")
endif()
if (GPU_VERSION STREQUAL "ON")
set(ENABLE_LICENSE "ON")
add_definitions("-DENABLE_LICENSE")
endif ()
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
if (BUILD_UNIT_TEST)
option(MEGASEARCH_BUILD_TESTS "Build the megasearch test suite" ON)
endif(BUILD_UNIT_TEST)
include(ExternalProject)
include(ThirdPartyPackages)
include_directories(${MEGASEARCH_SOURCE_DIR})
link_directories(${MEGASEARCH_BINARY_DIR})
## Following should be check
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/" ${CMAKE_MODULE_PATH})
set(VECWISE_ENGINE_INCLUDE ${PROJECT_SOURCE_DIR}/include)
set(VECWISE_ENGINE_SRC ${PROJECT_SOURCE_DIR}/src)
@ -97,7 +72,6 @@ link_directories(${VECWISE_THIRD_PARTY_BUILD}/lib64)
#execute_process(COMMAND bash build.sh
# WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party)
add_subdirectory(src)
if (BUILD_UNIT_TEST)

View File

@ -11,6 +11,14 @@ db_config:
db_flush_interval: 5 #unit: second
idmapper_max_open_file: 128
metric_config:
is_startup: true # true is on, false is off
collector: prometheus # prometheus, now we only have prometheus
prometheus_config:
collect_type: pull # pull means prometheus pull the message from megasearch, push means megasearch push metric to push gateway
push_gateway_ip_address: 127.0.0.1
push_gateway_port: 9091
license_config:
license_path: "/tmp/system.license"

View File

@ -4,19 +4,27 @@
# Proprietary and confidential.
#-------------------------------------------------------------------------------
aux_source_directory(cache cache_files)
aux_source_directory(config config_files)
aux_source_directory(server server_files)
aux_source_directory(utils utils_files)
aux_source_directory(db db_files)
aux_source_directory(wrapper wrapper_files)
#aux_source_directory(metrics metrics_files)
set(metrics_files
metrics/Metrics.cpp
metrics/MetricBase.h
)
set(license_check_files
license/LicenseLibrary.cpp
license/LicenseCheck.cpp
)
set(license_generator_src
set(license_generator_files
license/LicenseGenerator.cpp
license/LicenseLibrary.cpp
)
@ -27,16 +35,19 @@ set(service_files
thrift/gen-cpp/megasearch_types.cpp
)
set(vecwise_engine_src
set(vecwise_engine_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
${cache_files}
${db_files}
${wrapper_files}
# metrics/Metrics.cpp
${metrics_files}
)
set(get_sys_info_src
set(get_sys_info_files
license/GetSysInfo.cpp)
include_directories(/usr/include)
include_directories(/usr/local/cuda/include)
include_directories(thrift/gen-cpp)
@ -53,6 +64,9 @@ if (GPU_VERSION STREQUAL "ON")
cudart
cublas
libsqlite3.a
# libprometheus-cpp-push.a
# libprometheus-cpp-pull.a
# libprometheus-cpp-core.a
)
else()
set(engine_libs
@ -63,6 +77,9 @@ else()
libgfortran.a
libquadmath.a
libsqlite3.a
# libprometheus-cpp-push.a
# libprometheus-cpp-pull.a
# libprometheus-cpp-core.a
)
endif ()
@ -81,22 +98,37 @@ if (ENABLE_LICENSE STREQUAL "ON")
endif ()
cuda_add_library(vecwise_engine STATIC ${vecwise_engine_src})
cuda_add_library(vecwise_engine STATIC ${vecwise_engine_files})
target_link_libraries(vecwise_engine ${engine_libs})
add_library(metrics STATIC ${metrics_files})
if (ENABLE_LICENSE STREQUAL "ON")
add_library(vecwise_license STATIC ${license_check_files})
target_link_libraries(vecwise_license ${license_libs})
endif ()
#set(metrics_lib
# libprometheus-cpp-push.a
# libprometheus-cpp-pull.a
# libprometheus-cpp-core.a
# )
#add_library(vecwise_engine STATIC ${metrics_files} )
#target_link_libraries(metrics ${metrics_lib})
add_executable(vecwise_server
${config_files}
${server_files}
${utils_files}
${service_files}
${metrics_files}
${VECWISE_THIRD_PARTY_BUILD}/include/easylogging++.cc
)
set(server_libs
vecwise_engine
librocksdb.a
@ -120,13 +152,19 @@ else ()
endif()
if (ENABLE_LICENSE STREQUAL "ON")
add_executable(license_generator ${license_generator_src})
add_executable(get_sys_info ${get_sys_info_src})
add_executable(license_generator ${license_generator_files})
target_link_libraries(get_sys_info ${license_libs} vecwise_license)
target_link_libraries(license_generator ${license_libs})
install(TARGETS get_sys_info DESTINATION bin)
endif ()
install(TARGETS vecwise_server DESTINATION bin)
add_subdirectory(sdk)
add_subdirectory(sdk)
#target_link_libraries(
# libprometheus-cpp-push.a
# libprometheus-cpp-pull.a
# libprometheus-cpp-core.a
# pthread
# z
# ${CURL_LIBRARIES})

View File

@ -5,6 +5,7 @@
////////////////////////////////////////////////////////////////////////////////
#include "CacheMgr.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
@ -37,7 +38,7 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) {
if(cache_ == nullptr) {
return nullptr;
}
server::Metrics::GetInstance().CacheAccessTotalIncrement();
return cache_->get(key);
}
@ -56,6 +57,7 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
}
cache_->insert(key, data);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index) {
@ -65,6 +67,7 @@ void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index
DataObjPtr obj = std::make_shared<DataObj>(index);
cache_->insert(key, obj);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::EraseItem(const std::string& key) {
@ -73,6 +76,7 @@ void CacheMgr::EraseItem(const std::string& key) {
}
cache_->erase(key);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::PrintInfo() {

View File

@ -16,11 +16,14 @@
#include <cstring>
#include <easylogging++.h>
#include <cache/CpuCacheMgr.h>
#include "../utils/Log.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
namespace engine {
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
: env_(options.env),
@ -51,15 +54,32 @@ Status DBImpl<EngineT>::HasTable(const std::string& table_id, bool& has_or_not)
template<typename EngineT>
Status DBImpl<EngineT>::InsertVectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) {
auto start_time = METRICS_NOW_TIME;
Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (!status.ok()) {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
return status;
}
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
}
template<typename EngineT>
Status DBImpl<EngineT>::Query(const std::string &table_id, size_t k, size_t nq,
const float *vectors, QueryResults &results) {
meta::DatesT dates = {meta::Meta::GetDate()};
return Query(table_id, k, nq, vectors, dates, results);
}
@ -123,11 +143,34 @@ Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
index.Load();
auto file_size = index.PhysicalSize()/(1024*1024);
search_set_size += file_size;
LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
<< file_size << " M";
int inner_k = index.Count() < k ? index.Count() : k;
auto start_time = METRICS_NOW_TIME;
index.Search(nq, vectors, inner_k, output_distence, output_ids);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
if(file.file_type == meta::TableFileSchema::RAW) {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024);
} else if(file.file_type == meta::TableFileSchema::TO_INDEX) {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024);
} else {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size*1024*1024);
}
cluster(output_ids, output_distence, inner_k); // cluster to each query
memset(output_distence, 0, k * nq * sizeof(float));
memset(output_ids, 0, k * nq * sizeof(long));
@ -260,8 +303,14 @@ Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::Date
long index_size = 0;
for (auto& file : files) {
auto start_time = METRICS_NOW_TIME;
index.Merge(file.location);
auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
file_schema.file_type = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
LOG(DEBUG) << "Merging file " << file_schema.file_id;
@ -270,6 +319,7 @@ Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::Date
if (index_size >= options_.index_trigger_size) break;
}
index.Serialize();
if (index_size >= options_.index_trigger_size) {
@ -329,7 +379,11 @@ Status DBImpl<EngineT>::BuildIndex(const meta::TableFileSchema& file) {
EngineT to_index(file.dimension, file.location);
to_index.Load();
auto start_time = METRICS_NOW_TIME;
auto index = to_index.BuildIndex(table_file.location);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
table_file.file_type = meta::TableFileSchema::INDEX;
table_file.size = index->Size();

View File

@ -7,6 +7,7 @@
#include "IDGenerator.h"
#include "Utils.h"
#include "MetaConsts.h"
#include "metrics/Metrics.h"
#include <unistd.h>
#include <sstream>
@ -17,6 +18,7 @@
#include <sqlite_orm.h>
#include <easylogging++.h>
namespace zilliz {
namespace vecwise {
namespace engine {
@ -24,41 +26,41 @@ namespace meta {
using namespace sqlite_orm;
inline auto StoragePrototype(const std::string& path) {
inline auto StoragePrototype(const std::string &path) {
return make_storage(path,
make_table("Table",
make_column("id", &TableSchema::id, primary_key()),
make_column("table_id", &TableSchema::table_id, unique()),
make_column("dimension", &TableSchema::dimension),
make_column("created_on", &TableSchema::created_on),
make_column("files_cnt", &TableSchema::files_cnt, default_value(0))),
make_table("TableFile",
make_column("id", &TableFileSchema::id, primary_key()),
make_column("table_id", &TableFileSchema::table_id),
make_column("file_id", &TableFileSchema::file_id),
make_column("file_type", &TableFileSchema::file_type),
make_column("size", &TableFileSchema::size, default_value(0)),
make_column("updated_time", &TableFileSchema::updated_time),
make_column("created_on", &TableFileSchema::created_on),
make_column("date", &TableFileSchema::date))
);
make_table("Table",
make_column("id", &TableSchema::id, primary_key()),
make_column("table_id", &TableSchema::table_id, unique()),
make_column("dimension", &TableSchema::dimension),
make_column("created_on", &TableSchema::created_on),
make_column("files_cnt", &TableSchema::files_cnt, default_value(0))),
make_table("TableFile",
make_column("id", &TableFileSchema::id, primary_key()),
make_column("table_id", &TableFileSchema::table_id),
make_column("file_id", &TableFileSchema::file_id),
make_column("file_type", &TableFileSchema::file_type),
make_column("size", &TableFileSchema::size, default_value(0)),
make_column("updated_time", &TableFileSchema::updated_time),
make_column("created_on", &TableFileSchema::created_on),
make_column("date", &TableFileSchema::date))
);
}
using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr;
std::string DBMetaImpl::GetTablePath(const std::string& table_id) {
std::string DBMetaImpl::GetTablePath(const std::string &table_id) {
return options_.path + "/tables/" + table_id;
}
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string& table_id, DateT& date) {
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
std::stringstream ss;
ss << GetTablePath(table_id) << "/" << date;
return ss.str();
}
void DBMetaImpl::GetTableFilePath(TableFileSchema& group_file) {
void DBMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
if (group_file.date == EmptyDate) {
group_file.date = Meta::GetDate();
}
@ -68,7 +70,7 @@ void DBMetaImpl::GetTableFilePath(TableFileSchema& group_file) {
group_file.location = ss.str();
}
Status DBMetaImpl::NextTableId(std::string& table_id) {
Status DBMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.GetNextIDNumber();
@ -76,7 +78,7 @@ Status DBMetaImpl::NextTableId(std::string& table_id) {
return Status::OK();
}
Status DBMetaImpl::NextFileId(std::string& file_id) {
Status DBMetaImpl::NextFileId(std::string &file_id) {
std::stringstream ss;
SimpleIDGenerator g;
ss << g.GetNextIDNumber();
@ -84,7 +86,7 @@ Status DBMetaImpl::NextFileId(std::string& file_id) {
return Status::OK();
}
DBMetaImpl::DBMetaImpl(const DBMetaOptions& options_)
DBMetaImpl::DBMetaImpl(const DBMetaOptions &options_)
: options_(options_) {
Initialize();
}
@ -98,7 +100,7 @@ Status DBMetaImpl::Initialize() {
assert(ret);
}
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path+"/meta.sqlite"));
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
ConnectorPtr->sync_schema();
ConnectorPtr->open_forever(); // thread safe option
@ -110,8 +112,8 @@ Status DBMetaImpl::Initialize() {
}
// PXU TODO: Temp solution. Will fix later
Status DBMetaImpl::DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) {
Status DBMetaImpl::DropPartitionsByDates(const std::string &table_id,
const DatesT &dates) {
if (dates.size() == 0) {
return Status::OK();
}
@ -125,7 +127,7 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string& table_id,
auto yesterday = GetDateWithDelta(-1);
for (auto& date : dates) {
for (auto &date : dates) {
if (date >= yesterday) {
return Status::Error("Could not delete partitions with 2 days");
}
@ -133,28 +135,29 @@ Status DBMetaImpl::DropPartitionsByDates(const std::string& table_id,
try {
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
),
where(
c(&TableFileSchema::table_id) == table_id and
in(&TableFileSchema::date, dates)
));
} catch (std::exception & e) {
set(
c(&TableFileSchema::file_type) = (int) TableFileSchema::TO_DELETE
),
where(
c(&TableFileSchema::table_id) == table_id and
in(&TableFileSchema::date, dates)
));
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
return Status::OK();
}
Status DBMetaImpl::CreateTable(TableSchema& table_schema) {
Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
if (table_schema.table_id == "") {
NextTableId(table_schema.table_id);
}
table_schema.files_cnt = 0;
table_schema.id = -1;
table_schema.created_on = utils::GetMicroSecTimeStamp();
auto start_time = METRICS_NOW_TIME;
{
try {
auto id = ConnectorPtr->insert(table_schema);
@ -163,6 +166,9 @@ Status DBMetaImpl::CreateTable(TableSchema& table_schema) {
return Status::DBTransactionError("Add Table Error");
}
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
auto group_path = GetTablePath(table_schema.table_id);
@ -177,13 +183,18 @@ Status DBMetaImpl::CreateTable(TableSchema& table_schema) {
return Status::OK();
}
Status DBMetaImpl::DescribeTable(TableSchema& table_schema) {
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto groups = ConnectorPtr->select(columns(&TableSchema::id,
&TableSchema::table_id,
&TableSchema::files_cnt,
&TableSchema::dimension),
where(c(&TableSchema::table_id) == table_schema.table_id));
&TableSchema::table_id,
&TableSchema::files_cnt,
&TableSchema::dimension),
where(c(&TableSchema::table_id) == table_schema.table_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
assert(groups.size() <= 1);
if (groups.size() == 1) {
table_schema.id = std::get<0>(groups[0]);
@ -200,10 +211,16 @@ Status DBMetaImpl::DescribeTable(TableSchema& table_schema) {
return Status::OK();
}
Status DBMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto tables = ConnectorPtr->select(columns(&TableSchema::id),
where(c(&TableSchema::table_id) == table_id));
where(c(&TableSchema::table_id) == table_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
assert(tables.size() <= 1);
if (tables.size() == 1) {
has_or_not = true;
@ -217,7 +234,7 @@ Status DBMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
return Status::OK();
}
Status DBMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
if (file_schema.date == EmptyDate) {
file_schema.date = Meta::GetDate();
}
@ -238,8 +255,13 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
{
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto id = ConnectorPtr->insert(file_schema);
file_schema.id = id;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
} catch (...) {
return Status::DBTransactionError("Add file Error");
}
@ -258,22 +280,28 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
return Status::OK();
}
Status DBMetaImpl::FilesToIndex(TableFilesSchema& files) {
Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.clear();
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX));
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type)
== (int) TableFileSchema::TO_INDEX));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
std::map<std::string, TableSchema> groups;
TableFileSchema table_file;
for (auto& file : selected) {
for (auto &file : selected) {
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
@ -294,7 +322,7 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema& files) {
table_file.dimension = groups[table_file.table_id].dimension;
files.push_back(table_file);
}
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -303,13 +331,15 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema& files) {
}
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
const DatesT& partition,
DatePartionedTableFilesSchema &files) {
const DatesT &partition,
DatePartionedTableFilesSchema &files) {
files.clear();
DatesT today = {Meta::GetDate()};
const DatesT& dates = (partition.empty() == true) ? today : partition;
const DatesT &dates = (partition.empty() == true) ? today : partition;
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
@ -319,9 +349,13 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
where(c(&TableFileSchema::table_id) == table_id and
in(&TableFileSchema::date, dates) and
(c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or
c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_INDEX or
c(&TableFileSchema::file_type) == (int) TableFileSchema::INDEX)));
c(&TableFileSchema::file_type)
== (int) TableFileSchema::TO_INDEX or
c(&TableFileSchema::file_type)
== (int) TableFileSchema::INDEX)));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
TableSchema table_schema;
table_schema.table_id = table_id;
auto status = DescribeTable(table_schema);
@ -331,7 +365,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
TableFileSchema table_file;
for (auto& file : selected) {
for (auto &file : selected) {
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
@ -346,7 +380,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
}
files[table_file.date].push_back(table_file);
}
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -354,29 +388,34 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
return Status::OK();
}
Status DBMetaImpl::FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) {
Status DBMetaImpl::FilesToMerge(const std::string &table_id,
DatePartionedTableFilesSchema &files) {
files.clear();
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW and
c(&TableFileSchema::table_id) == table_id));
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW and
c(&TableFileSchema::table_id) == table_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
TableSchema table_schema;
table_schema.table_id = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
TableFileSchema table_file;
for (auto& file : selected) {
for (auto &file : selected) {
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
@ -391,7 +430,7 @@ Status DBMetaImpl::FilesToMerge(const std::string& table_id,
}
files[table_file.date].push_back(table_file);
}
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -399,17 +438,17 @@ Status DBMetaImpl::FilesToMerge(const std::string& table_id,
return Status::OK();
}
Status DBMetaImpl::GetTableFile(TableFileSchema& file_schema) {
Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) {
try {
auto files = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_id) == file_schema.file_id and
c(&TableFileSchema::table_id) == file_schema.table_id
c(&TableFileSchema::table_id) == file_schema.table_id
));
assert(files.size() <= 1);
if (files.size() == 1) {
@ -421,7 +460,7 @@ Status DBMetaImpl::GetTableFile(TableFileSchema& file_schema) {
file_schema.date = std::get<5>(files[0]);
} else {
return Status::NotFound("Table:" + file_schema.table_id +
" File:" + file_schema.file_id + " not found");
" File:" + file_schema.file_id + " not found");
}
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
@ -433,28 +472,27 @@ Status DBMetaImpl::GetTableFile(TableFileSchema& file_schema) {
// PXU TODO: Support Swap
Status DBMetaImpl::Archive() {
auto& criterias = options_.archive_conf.GetCriterias();
auto &criterias = options_.archive_conf.GetCriterias();
if (criterias.size() == 0) {
return Status::OK();
}
for (auto kv : criterias) {
auto& criteria = kv.first;
auto& limit = kv.second;
auto &criteria = kv.first;
auto &limit = kv.second;
if (criteria == "days") {
long usecs = limit * D_SEC * US_PS;
long now = utils::GetMicroSecTimeStamp();
try
{
try {
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
),
where(
c(&TableFileSchema::created_on) < (long)(now - usecs) and
c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE
));
} catch (std::exception & e) {
set(
c(&TableFileSchema::file_type) = (int) TableFileSchema::TO_DELETE
),
where(
c(&TableFileSchema::created_on) < (long) (now - usecs) and
c(&TableFileSchema::file_type) != (int) TableFileSchema::TO_DELETE
));
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -463,7 +501,7 @@ Status DBMetaImpl::Archive() {
long sum = 0;
Size(sum);
auto to_delete = (sum - limit*G);
auto to_delete = (sum - limit * G);
DiscardFiles(to_delete);
}
}
@ -471,21 +509,21 @@ Status DBMetaImpl::Archive() {
return Status::OK();
}
Status DBMetaImpl::Size(long& result) {
Status DBMetaImpl::Size(long &result) {
result = 0;
try {
auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size)),
where(
c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE
));
where(
c(&TableFileSchema::file_type) != (int) TableFileSchema::TO_DELETE
));
for (auto& sub_query : selected) {
if(!std::get<0>(sub_query)) {
for (auto &sub_query : selected) {
if (!std::get<0>(sub_query)) {
continue;
}
result += (long)(*std::get<0>(sub_query));
result += (long) (*std::get<0>(sub_query));
}
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -500,15 +538,16 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
}
try {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::size),
where(c(&TableFileSchema::file_type) != (int)TableFileSchema::TO_DELETE),
order_by(&TableFileSchema::id),
limit(10));
&TableFileSchema::size),
where(c(&TableFileSchema::file_type)
!= (int) TableFileSchema::TO_DELETE),
order_by(&TableFileSchema::id),
limit(10));
std::vector<int> ids;
TableFileSchema table_file;
for (auto& file : selected) {
for (auto &file : selected) {
if (to_discard_size <= 0) break;
table_file.id = std::get<0>(file);
table_file.size = std::get<1>(file);
@ -522,14 +561,14 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
}
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type) = (int)TableFileSchema::TO_DELETE
),
where(
in(&TableFileSchema::id, ids)
));
set(
c(&TableFileSchema::file_type) = (int) TableFileSchema::TO_DELETE
),
where(
in(&TableFileSchema::id, ids)
));
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -538,11 +577,16 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
return DiscardFiles(to_discard_size);
}
Status DBMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
file_schema.updated_time = utils::GetMicroSecTimeStamp();
try {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
ConnectorPtr->update(file_schema);
} catch (std::exception & e) {
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
LOG(DEBUG) << "table_id= " << file_schema.table_id << " file_id=" << file_schema.file_id;
throw e;
@ -550,19 +594,24 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
return Status::OK();
}
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
auto commited = ConnectorPtr->transaction([&] () mutable {
for (auto& file : files) {
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto commited = ConnectorPtr->transaction([&]() mutable {
for (auto &file : files) {
file.updated_time = utils::GetMicroSecTimeStamp();
ConnectorPtr->update(file);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
return true;
});
if (!commited) {
return Status::DBTransactionError("Update files Error");
}
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -573,18 +622,21 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
try {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE and
c(&TableFileSchema::updated_time) > now - seconds*US_PS));
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(
c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_DELETE
and
c(&TableFileSchema::updated_time)
> now - seconds * US_PS));
TableFilesSchema updated;
TableFileSchema table_file;
for (auto& file : selected) {
for (auto &file : selected) {
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
@ -598,7 +650,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
ConnectorPtr->remove<TableFileSchema>(table_file.id);
/* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */
}
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -609,18 +661,21 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
Status DBMetaImpl::CleanUp() {
try {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id,
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE or
c(&TableFileSchema::file_type) == (int)TableFileSchema::NEW));
&TableFileSchema::table_id,
&TableFileSchema::file_id,
&TableFileSchema::file_type,
&TableFileSchema::size,
&TableFileSchema::date),
where(
c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_DELETE
or
c(&TableFileSchema::file_type)
== (int) TableFileSchema::NEW));
TableFilesSchema updated;
TableFileSchema table_file;
for (auto& file : selected) {
for (auto &file : selected) {
table_file.id = std::get<0>(file);
table_file.table_id = std::get<1>(file);
table_file.file_id = std::get<2>(file);
@ -634,7 +689,7 @@ Status DBMetaImpl::CleanUp() {
ConnectorPtr->remove<TableFileSchema>(table_file.id);
/* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */
}
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}
@ -642,31 +697,39 @@ Status DBMetaImpl::CleanUp() {
return Status::OK();
}
Status DBMetaImpl::Count(const std::string& table_id, long& result) {
Status DBMetaImpl::Count(const std::string &table_id, long &result) {
try {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::size,
&TableFileSchema::date),
where((c(&TableFileSchema::file_type) == (int)TableFileSchema::RAW or
c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_INDEX or
c(&TableFileSchema::file_type) == (int)TableFileSchema::INDEX) and
c(&TableFileSchema::table_id) == table_id));
server::Metrics::GetInstance().MetaAccessTotalIncrement();
auto start_time = METRICS_NOW_TIME;
auto selected = ConnectorPtr->select(columns(&TableFileSchema::size,
&TableFileSchema::date),
where((c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or
c(&TableFileSchema::file_type) == (int) TableFileSchema::TO_INDEX
or
c(&TableFileSchema::file_type) == (int) TableFileSchema::INDEX)
and
c(&TableFileSchema::table_id) == table_id));
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time);
TableSchema table_schema;
table_schema.table_id = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
result = 0;
for (auto& file : selected) {
for (auto &file : selected) {
result += std::get<0>(file);
}
result /= table_schema.dimension;
} catch (std::exception & e) {
} catch (std::exception &e) {
LOG(DEBUG) << e.what();
throw e;
}

View File

@ -15,6 +15,8 @@
#include <wrapper/Index.h>
#include <wrapper/IndexBuilder.h>
#include <cache/CpuCacheMgr.h>
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
@ -64,6 +66,7 @@ template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::Load() {
auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool to_cache = false;
auto start_time = METRICS_NOW_TIME;
if (!index) {
index = read_index(location_);
to_cache = true;
@ -73,6 +76,16 @@ Status FaissExecutionEngine<IndexTrait>::Load() {
pIndex_ = index->data();
if (to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double total_size = (pIndex_->d) * (pIndex_->ntotal) * 4;
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(total_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time));
}
return Status::OK();
}

View File

@ -74,7 +74,7 @@ LicenseCheck::AlterFile(const std::string &license_file_path,
{
exit(1);
}
// printf("---runing---\n");
printf("---runing---\n");
pt->expires_at(pt->expires_at() + boost::posix_time::hours(1));
pt->async_wait(boost::bind(AlterFile, license_file_path, boost::asio::placeholders::error, pt));
return SERVER_SUCCESS;
@ -83,7 +83,8 @@ LicenseCheck::AlterFile(const std::string &license_file_path,
ServerError
LicenseCheck::StartCountingDown(const std::string &license_file_path) {
if (!LicenseLibrary::IsFileExistent(license_file_path)) exit(1);
if (!LicenseLibrary::IsFileExistent(license_file_path)) return SERVER_LICENSE_FILE_NOT_EXIST;
boost::asio::io_service io;
boost::asio::deadline_timer t(io, boost::posix_time::hours(1));
t.async_wait(boost::bind(AlterFile, license_file_path, boost::asio::placeholders::error, &t));

View File

@ -36,6 +36,7 @@ class LicenseCheck {
static ServerError
StartCountingDown(const std::string &license_file_path);
private:
};

View File

@ -0,0 +1,76 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include "server/ServerConfig.h"
namespace zilliz {
namespace vecwise {
namespace server {
class MetricsBase{
public:
static MetricsBase&
GetInstance(){
static MetricsBase instance;
return instance;
}
virtual ServerError Init() {};
virtual void AddGroupSuccessTotalIncrement(double value = 1) {};
virtual void AddGroupFailTotalIncrement(double value = 1) {};
virtual void HasGroupSuccessTotalIncrement(double value = 1) {};
virtual void HasGroupFailTotalIncrement(double value = 1) {};
virtual void GetGroupSuccessTotalIncrement(double value = 1) {};
virtual void GetGroupFailTotalIncrement(double value = 1) {};
virtual void GetGroupFilesSuccessTotalIncrement(double value = 1) {};
virtual void GetGroupFilesFailTotalIncrement(double value = 1) {};
virtual void AddVectorsSuccessTotalIncrement(double value = 1) {};
virtual void AddVectorsFailTotalIncrement(double value = 1) {};
virtual void AddVectorsDurationHistogramOberve(double value) {};
virtual void SearchSuccessTotalIncrement(double value = 1) {};
virtual void SearchFailTotalIncrement(double value = 1) {};
virtual void SearchDurationHistogramObserve(double value) {};
virtual void RawFileSizeHistogramObserve(double value) {};
virtual void IndexFileSizeHistogramObserve(double value) {};
virtual void BuildIndexDurationSecondsHistogramObserve(double value) {};
virtual void AllBuildIndexDurationSecondsHistogramObserve(double value) {};
virtual void CacheUsageGaugeIncrement(double value = 1) {};
virtual void CacheUsageGaugeDecrement(double value = 1) {};
virtual void CacheUsageGaugeSet(double value) {};
virtual void MetaVisitTotalIncrement(double value = 1) {};
virtual void MetaVisitDurationSecondsHistogramObserve(double value) {};
virtual void MemUsagePercentGaugeSet(double value) {};
virtual void MemUsagePercentGaugeIncrement(double value = 1) {};
virtual void MemUsagePercentGaugeDecrement(double value = 1) {};
virtual void MemUsageTotalGaugeSet(double value) {};
virtual void MemUsageTotalGaugeIncrement(double value = 1) {};
virtual void MemUsageTotalGaugeDecrement(double value = 1) {};
virtual void MetaAccessTotalIncrement(double value = 1) {};
virtual void MetaAccessDurationSecondsHistogramObserve(double value) {};
virtual void FaissDiskLoadDurationSecondsHistogramObserve(double value) {};
virtual void FaissDiskLoadSizeBytesHistogramObserve(double value) {};
virtual void FaissDiskLoadIOSpeedHistogramObserve(double value) {};
virtual void CacheAccessTotalIncrement(double value = 1) {};
virtual void MemTableMergeDurationSecondsHistogramObserve(double value) {};
virtual void SearchIndexDataDurationSecondsHistogramObserve(double value) {};
virtual void SearchRawDataDurationSecondsHistogramObserve(double value) {};
virtual void IndexFileSizeTotalIncrement(double value = 1) {};
virtual void RawFileSizeTotalIncrement(double value = 1) {};
virtual void IndexFileSizeGaugeSet(double value) {};
virtual void RawFileSizeGaugeSet(double value) {};
};
}
}
}

View File

@ -0,0 +1,38 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Metrics.h"
namespace zilliz {
namespace vecwise {
namespace server {
MetricsBase &
Metrics::CreateMetricsCollector(MetricCollectorType collector_type) {
switch (collector_type) {
case MetricCollectorType::PROMETHEUS:
// static PrometheusMetrics instance = PrometheusMetrics::GetInstance();
return MetricsBase::GetInstance();
default:return MetricsBase::GetInstance();
}
}
MetricsBase &
Metrics::GetInstance() {
ConfigNode &config = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
std::string collector_typr_str = config.GetValue(CONFIG_METRIC_COLLECTOR);
if (collector_typr_str == "prometheus") {
return CreateMetricsCollector(MetricCollectorType::PROMETHEUS);
} else if (collector_typr_str == "zabbix") {
return CreateMetricsCollector(MetricCollectorType::ZABBIX);
} else {
return CreateMetricsCollector(MetricCollectorType::INVALID);
}
}
}
}
}

48
cpp/src/metrics/Metrics.h Normal file
View File

@ -0,0 +1,48 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include <memory>
#include <vector>
#pragma once
#include "MetricBase.h"
//#include "PrometheusMetrics.h"
namespace zilliz {
namespace vecwise {
namespace server {
#define METRICS_NOW_TIME std::chrono::system_clock::now()
//#define server::Metrics::GetInstance() server::Metrics::GetInstance()
#define METRICS_MICROSECONDS(a, b) (std::chrono::duration_cast<std::chrono::microseconds> (b-a)).count();
enum class MetricCollectorType {
INVALID,
PROMETHEUS,
ZABBIX
};
class Metrics {
public:
static MetricsBase &
CreateMetricsCollector(MetricCollectorType collector_type);
static MetricsBase &
GetInstance();
};
}
}
}

View File

@ -0,0 +1,33 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "PrometheusMetrics.h"
namespace zilliz {
namespace vecwise {
namespace server {
ServerError
PrometheusMetrics::Init() {
ConfigNode& configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true:false;
// Following should be read from config file.
const std::string bind_address = "8080";
const std::string uri = std::string("/metrics");
const std::size_t num_threads = 2;
// Init Exposer
exposer_ptr_ = std::make_shared<prometheus::Exposer>(bind_address, uri, num_threads);
// Exposer Registry
exposer_ptr_->RegisterCollectable(registry_);
return SERVER_SUCCESS;
}
}
}
}

View File

@ -0,0 +1,389 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include <memory>
#include <vector>
#include <prometheus/registry.h>
#include <prometheus/exposer.h>
#include "server/ServerConfig.h"
#include "MetricBase.h"
#define METRICS_NOW_TIME std::chrono::system_clock::now()
#define server::Metrics::GetInstance() server::GetInstance()
#define METRICS_MICROSECONDS(a,b) (std::chrono::duration_cast<std::chrono::microseconds> (b-a)).count();
namespace zilliz {
namespace vecwise {
namespace server {
class PrometheusMetrics: public MetricsBase {
public:
static PrometheusMetrics &
GetInstance() {
// switch(MetricCollectorType) {
// case: prometheus::
// static
// }
static PrometheusMetrics instance;
return instance;
}
ServerError
Init();
private:
std::shared_ptr<prometheus::Exposer> exposer_ptr_;
std::shared_ptr<prometheus::Registry> registry_ = std::make_shared<prometheus::Registry>();
bool startup_ = false;
public:
void AddGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) add_group_success_total_.Increment(value);};
void AddGroupFailTotalIncrement(double value = 1.0) override { if(startup_) add_group_fail_total_.Increment(value);};
void HasGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) has_group_success_total_.Increment(value);};
void HasGroupFailTotalIncrement(double value = 1.0) override { if(startup_) has_group_fail_total_.Increment(value);};
void GetGroupSuccessTotalIncrement(double value = 1.0) override { if(startup_) get_group_success_total_.Increment(value);};
void GetGroupFailTotalIncrement(double value = 1.0) override { if(startup_) get_group_fail_total_.Increment(value);};
void GetGroupFilesSuccessTotalIncrement(double value = 1.0) override { if(startup_) get_group_files_success_total_.Increment(value);};
void GetGroupFilesFailTotalIncrement(double value = 1.0) override { if(startup_) get_group_files_fail_total_.Increment(value);};
void AddVectorsSuccessTotalIncrement(double value = 1.0) override { if(startup_) add_vectors_success_total_.Increment(value);};
void AddVectorsFailTotalIncrement(double value = 1.0) override { if(startup_) add_vectors_fail_total_.Increment(value);};
void AddVectorsDurationHistogramOberve(double value) override { if(startup_) add_vectors_duration_histogram_.Observe(value);};
void SearchSuccessTotalIncrement(double value = 1.0) override { if(startup_) search_success_total_.Increment(value);};
void SearchFailTotalIncrement(double value = 1.0) override { if(startup_) search_fail_total_.Increment(value); };
void SearchDurationHistogramObserve(double value) override { if(startup_) search_duration_histogram_.Observe(value);};
void RawFileSizeHistogramObserve(double value) override { if(startup_) raw_files_size_histogram_.Observe(value);};
void IndexFileSizeHistogramObserve(double value) override { if(startup_) index_files_size_histogram_.Observe(value);};
void BuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) build_index_duration_seconds_histogram_.Observe(value);};
void AllBuildIndexDurationSecondsHistogramObserve(double value) override { if(startup_) all_build_index_duration_seconds_histogram_.Observe(value);};
void CacheUsageGaugeIncrement(double value = 1.0) override { if(startup_) cache_usage_gauge_.Increment(value);};
void CacheUsageGaugeDecrement(double value = 1.0) override { if(startup_) cache_usage_gauge_.Decrement(value);};
void CacheUsageGaugeSet(double value) override { if(startup_) cache_usage_gauge_.Set(value);};
// void MetaVisitTotalIncrement(double value = 1.0) override { meta_visit_total_.Increment(value);};
// void MetaVisitDurationSecondsHistogramObserve(double value) override { meta_visit_duration_seconds_histogram_.Observe(value);};
void MemUsagePercentGaugeSet(double value) override { if(startup_) mem_usage_percent_gauge_.Set(value);};
void MemUsagePercentGaugeIncrement(double value = 1.0) override { if(startup_) mem_usage_percent_gauge_.Increment(value);};
void MemUsagePercentGaugeDecrement(double value = 1.0) override { if(startup_) mem_usage_percent_gauge_.Decrement(value);};
void MemUsageTotalGaugeSet(double value) override { if(startup_) mem_usage_total_gauge_.Set(value);};
void MemUsageTotalGaugeIncrement(double value = 1.0) override { if(startup_) mem_usage_total_gauge_.Increment(value);};
void MemUsageTotalGaugeDecrement(double value = 1.0) override { if(startup_) mem_usage_total_gauge_.Decrement(value);};
void MetaAccessTotalIncrement(double value = 1) { if(startup_) meta_access_total_.Increment(value);};
void MetaAccessDurationSecondsHistogramObserve(double value) { if(startup_) meta_access_duration_seconds_histogram_.Observe(value);};
void FaissDiskLoadDurationSecondsHistogramObserve(double value) { if(startup_) faiss_disk_load_duration_seconds_histogram_.Observe(value);};
void FaissDiskLoadSizeBytesHistogramObserve(double value) { if(startup_) faiss_disk_load_size_bytes_histogram_.Observe(value);};
void FaissDiskLoadIOSpeedHistogramObserve(double value) { if(startup_) faiss_disk_load_IO_speed_histogram_.Observe(value);};
void CacheAccessTotalIncrement(double value = 1) { if(startup_) cache_access_total_.Increment(value);};
void MemTableMergeDurationSecondsHistogramObserve(double value) { if(startup_) mem_table_merge_duration_seconds_histogram_.Observe(value);};
void SearchIndexDataDurationSecondsHistogramObserve(double value) { if(startup_) search_index_data_duration_seconds_histogram_.Observe(value);};
void SearchRawDataDurationSecondsHistogramObserve(double value) { if(startup_) search_raw_data_duration_seconds_histogram_.Observe(value);};
void IndexFileSizeTotalIncrement(double value = 1) { if(startup_) index_file_size_total_.Increment(value);};
void RawFileSizeTotalIncrement(double value = 1) { if(startup_) raw_file_size_total_.Increment(value);};
void IndexFileSizeGaugeSet(double value) { if(startup_) index_file_size_gauge_.Set(value);};
void RawFileSizeGaugeSet(double value) { if(startup_) raw_file_size_gauge_.Set(value);};
// prometheus::Counter &connection_total() {return connection_total_; }
//
// prometheus::Counter &add_group_success_total() { return add_group_success_total_; }
// prometheus::Counter &add_group_fail_total() { return add_group_fail_total_; }
//
// prometheus::Counter &get_group_success_total() { return get_group_success_total_;}
// prometheus::Counter &get_group_fail_total() { return get_group_fail_total_;}
//
// prometheus::Counter &has_group_success_total() { return has_group_success_total_;}
// prometheus::Counter &has_group_fail_total() { return has_group_fail_total_;}
//
// prometheus::Counter &get_group_files_success_total() { return get_group_files_success_total_;};
// prometheus::Counter &get_group_files_fail_total() { return get_group_files_fail_total_;}
//
// prometheus::Counter &add_vectors_success_total() { return add_vectors_success_total_; }
// prometheus::Counter &add_vectors_fail_total() { return add_vectors_fail_total_; }
//
// prometheus::Histogram &add_vectors_duration_histogram() { return add_vectors_duration_histogram_;}
//
// prometheus::Counter &search_success_total() { return search_success_total_; }
// prometheus::Counter &search_fail_total() { return search_fail_total_; }
//
// prometheus::Histogram &search_duration_histogram() { return search_duration_histogram_; }
// prometheus::Histogram &raw_files_size_histogram() { return raw_files_size_histogram_; }
// prometheus::Histogram &index_files_size_histogram() { return index_files_size_histogram_; }
//
// prometheus::Histogram &build_index_duration_seconds_histogram() { return build_index_duration_seconds_histogram_; }
//
// prometheus::Histogram &all_build_index_duration_seconds_histogram() { return all_build_index_duration_seconds_histogram_; }
//
// prometheus::Gauge &cache_usage_gauge() { return cache_usage_gauge_; }
//
// prometheus::Counter &meta_visit_total() { return meta_visit_total_; }
//
// prometheus::Histogram &meta_visit_duration_seconds_histogram() { return meta_visit_duration_seconds_histogram_; }
//
// prometheus::Gauge &mem_usage_percent_gauge() { return mem_usage_percent_gauge_; }
//
// prometheus::Gauge &mem_usage_total_gauge() { return mem_usage_total_gauge_; }
std::shared_ptr<prometheus::Exposer> &exposer_ptr() {return exposer_ptr_; }
// prometheus::Exposer& exposer() { return exposer_;}
std::shared_ptr<prometheus::Registry> &registry_ptr() {return registry_; }
// .....
private:
////all from db_connection.cpp
// prometheus::Family<prometheus::Counter> &connect_request_ = prometheus::BuildCounter()
// .Name("connection_total")
// .Help("total number of connection has been made")
// .Register(*registry_);
// prometheus::Counter &connection_total_ = connect_request_.Add({});
////all from DBImpl.cpp
using BucketBoundaries = std::vector<double>;
//record add_group request
prometheus::Family<prometheus::Counter> &add_group_request_ = prometheus::BuildCounter()
.Name("add_group_request_total")
.Help("the number of add_group request")
.Register(*registry_);
prometheus::Counter &add_group_success_total_ = add_group_request_.Add({{"outcome", "success"}});
prometheus::Counter &add_group_fail_total_ = add_group_request_.Add({{"outcome", "fail"}});
//record get_group request
prometheus::Family<prometheus::Counter> &get_group_request_ = prometheus::BuildCounter()
.Name("get_group_request_total")
.Help("the number of get_group request")
.Register(*registry_);
prometheus::Counter &get_group_success_total_ = get_group_request_.Add({{"outcome", "success"}});
prometheus::Counter &get_group_fail_total_ = get_group_request_.Add({{"outcome", "fail"}});
//record has_group request
prometheus::Family<prometheus::Counter> &has_group_request_ = prometheus::BuildCounter()
.Name("has_group_request_total")
.Help("the number of has_group request")
.Register(*registry_);
prometheus::Counter &has_group_success_total_ = has_group_request_.Add({{"outcome", "success"}});
prometheus::Counter &has_group_fail_total_ = has_group_request_.Add({{"outcome", "fail"}});
//record get_group_files
prometheus::Family<prometheus::Counter> &get_group_files_request_ = prometheus::BuildCounter()
.Name("get_group_files_request_total")
.Help("the number of get_group_files request")
.Register(*registry_);
prometheus::Counter &get_group_files_success_total_ = get_group_files_request_.Add({{"outcome", "success"}});
prometheus::Counter &get_group_files_fail_total_ = get_group_files_request_.Add({{"outcome", "fail"}});
//record add_vectors count and average time
//need to be considered
prometheus::Family<prometheus::Counter> &add_vectors_request_ = prometheus::BuildCounter()
.Name("add_vectors_request_total")
.Help("the number of vectors added")
.Register(*registry_);
prometheus::Counter &add_vectors_success_total_ = add_vectors_request_.Add({{"outcome", "success"}});
prometheus::Counter &add_vectors_fail_total_ = add_vectors_request_.Add({{"outcome", "fail"}});
prometheus::Family<prometheus::Histogram> &add_vectors_duration_seconds_ = prometheus::BuildHistogram()
.Name("add_vector_duration_seconds")
.Help("average time of adding every vector")
.Register(*registry_);
prometheus::Histogram &add_vectors_duration_histogram_ = add_vectors_duration_seconds_.Add({}, BucketBoundaries{0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.08, 0.1, 0.5, 1});
//record search count and average time
prometheus::Family<prometheus::Counter> &search_request_ = prometheus::BuildCounter()
.Name("search_request_total")
.Help("the number of search request")
.Register(*registry_);
prometheus::Counter &search_success_total_ = search_request_.Add({{"outcome","success"}});
prometheus::Counter &search_fail_total_ = search_request_.Add({{"outcome","fail"}});
prometheus::Family<prometheus::Histogram> &search_request_duration_seconds_ = prometheus::BuildHistogram()
.Name("search_request_duration_second")
.Help("histogram of processing time for each search")
.Register(*registry_);
prometheus::Histogram &search_duration_histogram_ = search_request_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0});
//record raw_files size histogram
prometheus::Family<prometheus::Histogram> &raw_files_size_ = prometheus::BuildHistogram()
.Name("search_raw_files_bytes")
.Help("histogram of raw files size by bytes")
.Register(*registry_);
prometheus::Histogram &raw_files_size_histogram_ = raw_files_size_.Add({}, BucketBoundaries{0.1, 1.0, 10.0});
//record index_files size histogram
prometheus::Family<prometheus::Histogram> &index_files_size_ = prometheus::BuildHistogram()
.Name("search_index_files_bytes")
.Help("histogram of index files size by bytes")
.Register(*registry_);
prometheus::Histogram &index_files_size_histogram_ = index_files_size_.Add({}, BucketBoundaries{0.1, 1.0, 10.0});
//record index and raw files size counter
prometheus::Family<prometheus::Counter> &file_size_total_ = prometheus::BuildCounter()
.Name("search_file_size_total")
.Help("searched index and raw file size")
.Register(*registry_);
prometheus::Counter &index_file_size_total_ = file_size_total_.Add({{"type", "index"}});
prometheus::Counter &raw_file_size_total_ = file_size_total_.Add({{"type", "raw"}});
//record index and raw files size counter
prometheus::Family<prometheus::Gauge> &file_size_gauge_ = prometheus::BuildGauge()
.Name("search_file_size_gauge")
.Help("searched current index and raw file size")
.Register(*registry_);
prometheus::Gauge &index_file_size_gauge_ = file_size_gauge_.Add({{"type", "index"}});
prometheus::Gauge &raw_file_size_gauge_ = file_size_gauge_.Add({{"type", "raw"}});
//record processing time for building index
prometheus::Family<prometheus::Histogram> &build_index_duration_seconds_ = prometheus::BuildHistogram()
.Name("build_index_duration_seconds")
.Help("histogram of processing time for building index")
.Register(*registry_);
prometheus::Histogram &build_index_duration_seconds_histogram_ = build_index_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0});
//record processing time for all building index
prometheus::Family<prometheus::Histogram> &all_build_index_duration_seconds_ = prometheus::BuildHistogram()
.Name("all_build_index_duration_seconds")
.Help("histogram of processing time for building index")
.Register(*registry_);
prometheus::Histogram &all_build_index_duration_seconds_histogram_ = all_build_index_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0});
//record duration of merging mem table
prometheus::Family<prometheus::Histogram> &mem_table_merge_duration_seconds_ = prometheus::BuildHistogram()
.Name("mem_table_merge_duration_seconds")
.Help("histogram of processing time for merging mem tables")
.Register(*registry_);
prometheus::Histogram &mem_table_merge_duration_seconds_histogram_ = mem_table_merge_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0});
//record search index and raw data duration
prometheus::Family<prometheus::Histogram> &search_data_duration_seconds_ = prometheus::BuildHistogram()
.Name("search_data_duration_seconds")
.Help("histograms of processing time for search index and raw data")
.Register(*registry_);
prometheus::Histogram &search_index_data_duration_seconds_histogram_ = search_data_duration_seconds_.Add({{"type", "index"}}, BucketBoundaries{0.1, 1.0, 10.0});
prometheus::Histogram &search_raw_data_duration_seconds_histogram_ = search_data_duration_seconds_.Add({{"type", "raw"}}, BucketBoundaries{0.1, 1.0, 10.0});
////all form Cache.cpp
//record cache usage, when insert/erase/clear/free
prometheus::Family<prometheus::Gauge> &cache_usage_ = prometheus::BuildGauge()
.Name("cache_usage")
.Help("total bytes that cache used")
.Register(*registry_);
prometheus::Gauge &cache_usage_gauge_ = cache_usage_.Add({});
////all from Meta.cpp
//record meta visit count and time
// prometheus::Family<prometheus::Counter> &meta_visit_ = prometheus::BuildCounter()
// .Name("meta_visit_total")
// .Help("the number of accessing Meta")
// .Register(*registry_);
// prometheus::Counter &meta_visit_total_ = meta_visit_.Add({{}});
//
// prometheus::Family<prometheus::Histogram> &meta_visit_duration_seconds_ = prometheus::BuildHistogram()
// .Name("meta_visit_duration_seconds")
// .Help("histogram of processing time to get data from mata")
// .Register(*registry_);
// prometheus::Histogram &meta_visit_duration_seconds_histogram_ = meta_visit_duration_seconds_.Add({{}}, BucketBoundaries{0.1, 1.0, 10.0});
////all from MemManager.cpp
//record memory usage percent
prometheus::Family<prometheus::Gauge> &mem_usage_percent_ = prometheus::BuildGauge()
.Name("memory_usage_percent")
.Help("memory usage percent")
.Register(*registry_);
prometheus::Gauge &mem_usage_percent_gauge_ = mem_usage_percent_.Add({});
//record memory usage toal
prometheus::Family<prometheus::Gauge> &mem_usage_total_ = prometheus::BuildGauge()
.Name("memory_usage_total")
.Help("memory usage total")
.Register(*registry_);
prometheus::Gauge &mem_usage_total_gauge_ = mem_usage_total_.Add({});
////all from DBMetaImpl.cpp
//record meta access count
prometheus::Family<prometheus::Counter> &meta_access_ = prometheus::BuildCounter()
.Name("meta_access_total")
.Help("the number of meta accessing")
.Register(*registry_);
prometheus::Counter &meta_access_total_ = meta_access_.Add({});
//record meta access duration
prometheus::Family<prometheus::Histogram> &meta_access_duration_seconds_ = prometheus::BuildHistogram()
.Name("meta_access_duration_seconds")
.Help("histogram of processing time for accessing mata")
.Register(*registry_);
prometheus::Histogram &meta_access_duration_seconds_histogram_ = meta_access_duration_seconds_.Add({}, BucketBoundaries{0.1, 1.0, 10.0});
////all from FaissExecutionEngine.cpp
//record data loading from disk count, size, duration, IO speed
prometheus::Family<prometheus::Histogram> &disk_load_duration_second_ = prometheus::BuildHistogram()
.Name("disk_load_duration_seconds")
.Help("Histogram of processing time for loading data from disk")
.Register(*registry_);
prometheus::Histogram &faiss_disk_load_duration_seconds_histogram_ = disk_load_duration_second_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0});
prometheus::Family<prometheus::Histogram> &disk_load_size_bytes_ = prometheus::BuildHistogram()
.Name("disk_load_size_bytes")
.Help("Histogram of data size by bytes for loading data from disk")
.Register(*registry_);
prometheus::Histogram &faiss_disk_load_size_bytes_histogram_ = disk_load_size_bytes_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0});
prometheus::Family<prometheus::Histogram> &disk_load_IO_speed_ = prometheus::BuildHistogram()
.Name("disk_load_IO_speed_byte_per_sec")
.Help("Histogram of IO speed for loading data from disk")
.Register(*registry_);
prometheus::Histogram &faiss_disk_load_IO_speed_histogram_ = disk_load_IO_speed_.Add({{"DB","Faiss"}},BucketBoundaries{0.1, 1.0, 10.0});
////all from CacheMgr.cpp
//record cache access count
prometheus::Family<prometheus::Counter> &cache_access_ = prometheus::BuildCounter()
.Name("cache_access_total")
.Help("the count of accessing cache ")
.Register(*registry_);
prometheus::Counter &cache_access_total_ = cache_access_.Add({});
};
}
}
}

View File

@ -21,7 +21,7 @@ namespace server {
static const std::string ROCKSDB_DEFAULT_GROUP = "default";
RocksIdMapper::RocksIdMapper()
: db_(nullptr) {
: db_(nullptr) {
OpenDb();
}
@ -277,7 +277,7 @@ ServerError RocksIdMapper::GetInternal(const std::string& nid, std::string& sid,
}
ServerError RocksIdMapper::DeleteInternal(const std::string& nid, const std::string& group) {
if(db_ == nullptr) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
@ -320,6 +320,7 @@ ServerError RocksIdMapper::DeleteGroupInternal(const std::string& group) {
return SERVER_SUCCESS;
}
}
}
}

View File

@ -20,7 +20,7 @@ namespace vecwise {
namespace server {
class RocksIdMapper : public IVecIdMapper{
public:
public:
RocksIdMapper();
~RocksIdMapper();
@ -37,7 +37,7 @@ public:
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
private:
void OpenDb();
void CloseDb();
@ -53,12 +53,13 @@ private:
ServerError DeleteGroupInternal(const std::string& group);
private:
private:
rocksdb::DB* db_;
mutable std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
mutable std::mutex db_mutex_;
};
}
}
}

View File

@ -10,6 +10,7 @@
#include "utils/SignalUtil.h"
#include "utils/TimeRecorder.h"
#include "license/LicenseCheck.h"
#include "metrics/Metrics.h"
#include <fcntl.h>
#include <sys/stat.h>
@ -19,6 +20,8 @@
#include <unistd.h>
#include <string.h>
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
namespace server {
@ -133,6 +136,10 @@ Server::Daemonize() {
int
Server::Start() {
// server::Metrics::GetInstance().Init();
// server::Metrics::GetInstance().exposer_ptr()->RegisterCollectable(server::Metrics::GetInstance().registry_ptr());
server::Metrics::GetInstance().Init();
if (daemonized_) {
Daemonize();
}
@ -160,8 +167,10 @@ Server::Start() {
exit(1);
}
std::thread counting_down(&server::LicenseCheck::StartCountingDown, license_file_path);
counting_down.detach();
if(server::LicenseCheck::StartCountingDown(license_file_path) != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "License counter start error";
exit(1);
}
#endif
// Handle Signal

View File

@ -35,6 +35,10 @@ static const std::string CONFIG_GPU_CACHE_CAPACITY = "gpu_cache_capacity";
static const std::string CONFIG_LICENSE = "license_config";
static const std::string CONFIG_LICENSE_PATH = "license_path";
static const std::string CONFIG_METRIC = "metric_config";
static const std::string CONFIG_METRIC_IS_STARTUP = "is_startup";
static const std::string CONFIG_METRIC_COLLECTOR = "collector";
class ServerConfig {
public:
static ServerConfig &GetInstance();

View File

@ -39,14 +39,16 @@ SimpleIdMapper::~SimpleIdMapper() {
}
ServerError SimpleIdMapper::AddGroup(const std::string& group) {
ServerError
SimpleIdMapper::AddGroup(const std::string& group) {
if(id_groups_.count(group) == 0) {
id_groups_.insert(std::make_pair(group, ID_MAPPING()));
}
}
//not thread-safe
bool SimpleIdMapper::IsGroupExist(const std::string& group) const {
bool
SimpleIdMapper::IsGroupExist(const std::string& group) const {
return id_groups_.count(group) > 0;
}

View File

@ -15,9 +15,11 @@ set(unittest_libs
yaml-cpp
gtest_main
gmock_main
pthread)
pthread
metrics)
add_subdirectory(server)
add_subdirectory(db)
add_subdirectory(faiss_wrapper)
add_subdirectory(license)
add_subdirectory(license)
add_subdirectory(metrics)

View File

@ -108,6 +108,8 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
TEST_F(DBTest, DB_TEST) {
static const std::string group_name = "test_group";
static const int group_dim = 256;
@ -251,3 +253,4 @@ TEST_F(DBTest, SEARCH_TEST) {
// TODO(linxj): add groundTruth assert
};

View File

@ -0,0 +1,76 @@
#set(CURL_LIBRARY "-lcurl")
#find_package(CURL REQUIRED)
#INCLUDE_DIRECTORIES(${CURL_INCLUDE_DIR})
#
#INCLUDE_DIRECTORIES(~/development/lib/usr/local/include)
#LINK_DIRECTORIES(~/development/lib/usr/local/lib)
include_directories(../../src)
aux_source_directory(../../src/db db_srcs)
aux_source_directory(../../src/config config_files)
aux_source_directory(../../src/cache cache_srcs)
aux_source_directory(../../src/wrapper wrapper_src)
include_directories(/usr/include)
include_directories(../../third_party/build/include)
link_directories(../../third_party/build/lib)
include_directories(/usr/local/cuda/include)
link_directories("/usr/local/cuda/lib64")
#include_directories(../db/utils.h)
include_directories(../../src/metrics)
set(require_files
../../src/metrics/Metrics.cpp
# ../../src/cache/CacheMgr.cpp
# ../../src/metrics/PrometheusMetrics.cpp
../../src/server/ServerConfig.cpp
../../src/utils/CommonUtil.cpp
../../src/utils/TimeRecorder.cpp
# ../../src/metrics/PrometheusMetrics.cpp
)
set(count_test_src
# ${unittest_srcs}
# ${config_files}
# ${require_files}
${unittest_srcs}
${config_files}
${cache_srcs}
${db_srcs}
${wrapper_src}
${require_files}
metrics_test.cpp
../db/utils.cpp
../../src/metrics/Metrics.h
)
add_executable(metrics_test ${count_test_src} ${require_files} )
target_link_libraries(metrics_test
${unittest_libs}
faiss
cudart
cublas
sqlite3
boost_system
boost_filesystem
lz4
metrics
gtest
# prometheus-cpp-pull
# prometheus-cpp-push
# prometheus-cpp-core
pthread
z
)

View File

@ -0,0 +1,121 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <chrono>
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <gtest/gtest.h>
//#include "prometheus/registry.h"
//#include "prometheus/exposer.h"
#include <cache/CpuCacheMgr.h>
#include "metrics/Metrics.h"
#include "../db/utils.h"
#include "db/DB.h"
#include "db/DBMetaImpl.h"
#include "db/Factories.h"
using namespace zilliz::vecwise;
TEST_F(DBTest, Metric_Tes) {
// server::Metrics::GetInstance().Init();
// server::Metrics::GetInstance().exposer_ptr()->RegisterCollectable(server::Metrics::GetInstance().registry_ptr());
server::Metrics::GetInstance().Init();
// server::PrometheusMetrics::GetInstance().exposer_ptr()->RegisterCollectable(server::PrometheusMetrics::GetInstance().registry_ptr());
zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->SetCapacity(1*1024*1024*1024);
std::cout<<zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->CacheCapacity()<<std::endl;
static const std::string group_name = "test_group";
static const int group_dim = 256;
engine::meta::TableSchema group_info;
group_info.dimension = group_dim;
group_info.table_id = group_name;
engine::Status stat = db_->CreateTable(group_info);
engine::meta::TableSchema group_info_get;
group_info_get.table_id = group_name;
stat = db_->DescribeTable(group_info_get);
engine::IDNumbers vector_ids;
engine::IDNumbers target_ids;
int d = 256;
int nb = 50;
float *xb = new float[d * nb];
for(int i = 0; i < nb; i++) {
for(int j = 0; j < d; j++) xb[d * i + j] = drand48();
xb[d * i] += i / 2000.;
}
int qb = 5;
float *qxb = new float[d * qb];
for(int i = 0; i < qb; i++) {
for(int j = 0; j < d; j++) qxb[d * i + j] = drand48();
qxb[d * i] += i / 2000.;
}
std::thread search([&]() {
engine::QueryResults results;
int k = 10;
std::this_thread::sleep_for(std::chrono::seconds(2));
INIT_TIMER;
std::stringstream ss;
long count = 0;
long prev_count = -1;
for (auto j=0; j<10; ++j) {
ss.str("");
db_->Size(count);
prev_count = count;
START_TIMER;
stat = db_->Query(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/(1024*1024) << " M";
// STOP_TIMER(ss.str());
ASSERT_STATS(stat);
for (auto k=0; k<qb; ++k) {
ASSERT_EQ(results[k][0], target_ids[k]);
ss.str("");
ss << "Result [" << k << "]:";
for (auto result : results[k]) {
ss << result << " ";
}
/* LOG(DEBUG) << ss.str(); */
}
ASSERT_TRUE(count >= prev_count);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
int loop = 100000;
for (auto i=0; i<loop; ++i) {
if (i==40) {
db_->InsertVectors(group_name, qb, qxb, target_ids);
ASSERT_EQ(target_ids.size(), qb);
} else {
db_->InsertVectors(group_name, nb, xb, vector_ids);
}
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
search.join();
delete [] xb;
delete [] qxb;
};