mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
Add MySQL
Former-commit-id: b79fe37030a73eb22300c33ff5122a98945daf22
This commit is contained in:
parent
25e8ed5362
commit
e5892dbefc
@ -40,6 +40,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-85 - add NetIO metric
|
||||
- MS-96 - add new query interface for specified files
|
||||
- MS-97 - Add S3 SDK for MinIO Storage
|
||||
- MS-105 - Add MySQL
|
||||
|
||||
## Task
|
||||
- MS-74 - Change README.md in cpp
|
||||
|
@ -1,13 +1,22 @@
|
||||
### Compilation
|
||||
#### Step 1: install necessery tools
|
||||
|
||||
Install MySQL
|
||||
|
||||
centos7 :
|
||||
yum install gfortran flex bison
|
||||
yum install gfortran qt4 flex bison mysql-devel
|
||||
|
||||
ubuntu16.04 :
|
||||
sudo apt-get install gfortran flex bison
|
||||
sudo apt-get install gfortran qt4-qmake flex bison libmysqlclient-dev
|
||||
|
||||
If `libmysqlclient_r.so` does not exist after installing MySQL Development Files, you need to create a symbolic link:
|
||||
|
||||
```
|
||||
sudo ln -s /path/to/libmysqlclient.so /path/to/libmysqlclient_r.so
|
||||
```
|
||||
|
||||
#### Step 2: build(output to cmake_build folder)
|
||||
|
||||
cmake_build/src/milvus_server is the server
|
||||
|
||||
cmake_build/src/libmilvus_engine.a is the static library
|
||||
|
@ -93,6 +93,8 @@ define_option(MILVUS_WITH_SQLITE "Build with SQLite library" ON)
|
||||
|
||||
define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON)
|
||||
|
||||
define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON)
|
||||
|
||||
define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" ON)
|
||||
|
||||
define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON)
|
||||
|
@ -26,6 +26,7 @@ set(MILVUS_THIRDPARTY_DEPENDENCIES
|
||||
JSONCONS
|
||||
LAPACK
|
||||
Lz4
|
||||
MySQLPP
|
||||
OpenBLAS
|
||||
Prometheus
|
||||
RocksDB
|
||||
@ -56,12 +57,14 @@ macro(build_dependency DEPENDENCY_NAME)
|
||||
build_easyloggingpp()
|
||||
elseif("${DEPENDENCY_NAME}" STREQUAL "FAISS")
|
||||
build_faiss()
|
||||
elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest")
|
||||
build_gtest()
|
||||
elseif("${DEPENDENCY_NAME}" STREQUAL "LAPACK")
|
||||
build_lapack()
|
||||
elseif("${DEPENDENCY_NAME}" STREQUAL "Lz4")
|
||||
build_lz4()
|
||||
elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest")
|
||||
build_gtest()
|
||||
elseif ("${DEPENDENCY_NAME}" STREQUAL "MySQLPP")
|
||||
build_mysqlpp()
|
||||
elseif ("${DEPENDENCY_NAME}" STREQUAL "JSONCONS")
|
||||
build_jsoncons()
|
||||
elseif ("${DEPENDENCY_NAME}" STREQUAL "OpenBLAS")
|
||||
@ -265,6 +268,12 @@ else()
|
||||
set(LZ4_SOURCE_URL "https://github.com/lz4/lz4/archive/${LZ4_VERSION}.tar.gz")
|
||||
endif()
|
||||
|
||||
if(DEFINED ENV{MILVUS_MYSQLPP_URL})
|
||||
set(MYSQLPP_SOURCE_URL "$ENV{MILVUS_MYSQLPP_URL}")
|
||||
else()
|
||||
set(MYSQLPP_SOURCE_URL "https://tangentsoft.com/mysqlpp/releases/mysql++-${MYSQLPP_VERSION}.tar.gz")
|
||||
endif()
|
||||
|
||||
if (DEFINED ENV{MILVUS_OPENBLAS_URL})
|
||||
set(OPENBLAS_SOURCE_URL "$ENV{MILVUS_OPENBLAS_URL}")
|
||||
else ()
|
||||
@ -829,8 +838,8 @@ macro(build_faiss)
|
||||
# ${MAKE} ${MAKE_BUILD_ARGS}
|
||||
BUILD_COMMAND
|
||||
${MAKE} ${MAKE_BUILD_ARGS} all
|
||||
COMMAND
|
||||
cd gpu && make ${MAKE_BUILD_ARGS}
|
||||
COMMAND
|
||||
cd gpu && ${MAKE} ${MAKE_BUILD_ARGS}
|
||||
BUILD_IN_SOURCE
|
||||
1
|
||||
# INSTALL_DIR
|
||||
@ -1068,6 +1077,56 @@ if(MILVUS_WITH_LZ4)
|
||||
include_directories(SYSTEM ${LZ4_INCLUDE_DIR})
|
||||
endif()
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# MySQL++
|
||||
|
||||
macro(build_mysqlpp)
|
||||
message(STATUS "Building MySQL++-${MYSQLPP_VERSION} from source")
|
||||
set(MYSQLPP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep")
|
||||
set(MYSQLPP_INCLUDE_DIR "${MYSQLPP_PREFIX}/include")
|
||||
set(MYSQLPP_SHARED_LIB
|
||||
"${MYSQLPP_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}")
|
||||
|
||||
set(MYSQLPP_CONFIGURE_ARGS
|
||||
"--prefix=${MYSQLPP_PREFIX}"
|
||||
"--enable-thread-check"
|
||||
"CFLAGS=${EP_C_FLAGS}"
|
||||
"CXXFLAGS=${EP_CXX_FLAGS}"
|
||||
"LDFLAGS=-pthread")
|
||||
|
||||
externalproject_add(mysqlpp_ep
|
||||
URL
|
||||
${MYSQLPP_SOURCE_URL}
|
||||
${EP_LOG_OPTIONS}
|
||||
CONFIGURE_COMMAND
|
||||
"./configure"
|
||||
${MYSQLPP_CONFIGURE_ARGS}
|
||||
BUILD_COMMAND
|
||||
${MAKE} ${MAKE_BUILD_ARGS}
|
||||
BUILD_IN_SOURCE 1
|
||||
BUILD_BYPRODUCTS
|
||||
${MYSQLPP_SHARED_LIB})
|
||||
|
||||
file(MAKE_DIRECTORY "${MYSQLPP_INCLUDE_DIR}")
|
||||
add_library(mysqlpp SHARED IMPORTED)
|
||||
set_target_properties(
|
||||
mysqlpp
|
||||
PROPERTIES
|
||||
IMPORTED_LOCATION "${MYSQLPP_SHARED_LIB}"
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${MYSQLPP_INCLUDE_DIR}")
|
||||
|
||||
add_dependencies(mysqlpp mysqlpp_ep)
|
||||
|
||||
endmacro()
|
||||
|
||||
if(MILVUS_WITH_MYSQLPP)
|
||||
|
||||
resolve_dependency(MySQLPP)
|
||||
get_target_property(MYSQLPP_INCLUDE_DIR mysqlpp INTERFACE_INCLUDE_DIRECTORIES)
|
||||
include_directories(SYSTEM "${MYSQLPP_INCLUDE_DIR}")
|
||||
link_directories(SYSTEM ${MYSQLPP_PREFIX}/lib)
|
||||
endif()
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Prometheus
|
||||
|
||||
|
@ -8,7 +8,9 @@ server_config:
|
||||
|
||||
db_config:
|
||||
db_path: /tmp/milvus
|
||||
db_backend_url: http://127.0.0.1
|
||||
#URI format: dialect://username:password@host:port/database
|
||||
#All parts except dialect are optional, but you MUST include the delimiters
|
||||
db_backend_url: sqlite://:@:/
|
||||
index_building_threshold: 1024 #build index file when raw data file size larger than this value, unit: MB
|
||||
|
||||
metric_config:
|
||||
|
@ -62,6 +62,7 @@ set(s3_client_files
|
||||
include_directories(/usr/include)
|
||||
include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include")
|
||||
include_directories(thrift/gen-cpp)
|
||||
include_directories(/usr/include/mysql)
|
||||
|
||||
set(third_party_libs
|
||||
easyloggingpp
|
||||
@ -83,6 +84,7 @@ set(third_party_libs
|
||||
snappy
|
||||
zlib
|
||||
zstd
|
||||
mysqlpp
|
||||
${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so
|
||||
)
|
||||
if (MEGASEARCH_WITH_ARROW STREQUAL "ON")
|
||||
|
@ -136,7 +136,8 @@ DBImpl::DBImpl(const Options& options)
|
||||
compact_thread_pool_(1, 1),
|
||||
index_thread_pool_(1, 1) {
|
||||
meta_ptr_ = DBMetaImplFactory::Build(options.meta);
|
||||
mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
|
||||
mem_mgr_ = std::make_shared<MemManager>(meta_ptr_, options_);
|
||||
// mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
|
||||
StartTimerTasks();
|
||||
}
|
||||
|
||||
@ -466,9 +467,14 @@ void DBImpl::StartMetricTask() {
|
||||
}
|
||||
|
||||
void DBImpl::StartCompactionTask() {
|
||||
// static int count = 0;
|
||||
// count++;
|
||||
// std::cout << "StartCompactionTask: " << count << std::endl;
|
||||
// std::cout << "c: " << count++ << std::endl;
|
||||
static uint64_t compact_clock_tick = 0;
|
||||
compact_clock_tick++;
|
||||
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
|
||||
// std::cout << "c r: " << count++ << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -574,6 +580,10 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
|
||||
}
|
||||
|
||||
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
|
||||
// static int b_count = 0;
|
||||
// b_count++;
|
||||
// std::cout << "BackgroundCompaction: " << b_count << std::endl;
|
||||
|
||||
Status status;
|
||||
for (auto table_id : table_ids) {
|
||||
status = BackgroundMergeFiles(table_id);
|
||||
|
@ -56,11 +56,11 @@ std::shared_ptr<meta::DBMetaImpl> DBMetaImplFactory::Build() {
|
||||
std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOptions) {
|
||||
|
||||
std::string uri = metaOptions.backend_uri;
|
||||
if (uri.empty()) {
|
||||
//Default to sqlite if uri is empty
|
||||
// return std::make_shared<meta::DBMetaImpl>(new meta::DBMetaImpl(metaOptions));
|
||||
return std::shared_ptr<meta::DBMetaImpl>(new meta::DBMetaImpl(metaOptions));
|
||||
}
|
||||
// if (uri.empty()) {
|
||||
// //Default to sqlite if uri is empty
|
||||
//// return std::make_shared<meta::DBMetaImpl>(new meta::DBMetaImpl(metaOptions));
|
||||
// return std::shared_ptr<meta::DBMetaImpl>(new meta::DBMetaImpl(metaOptions));
|
||||
// }
|
||||
|
||||
std::string dialectRegex = "(.*)";
|
||||
std::string usernameRegex = "(.*)";
|
||||
@ -81,12 +81,10 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
|
||||
std::string dialect = pieces_match[1].str();
|
||||
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
|
||||
if (dialect.find("mysql") != std::string::npos) {
|
||||
// return std::make_shared<meta::MySQLMetaImpl>(new meta::MySQLMetaImpl(metaOptions));
|
||||
return std::shared_ptr<meta::MySQLMetaImpl>(new meta::MySQLMetaImpl(metaOptions));
|
||||
return std::make_shared<meta::MySQLMetaImpl>(meta::MySQLMetaImpl(metaOptions));
|
||||
}
|
||||
else if (dialect.find("sqlite") != std::string::npos) {
|
||||
// return std::make_shared<meta::DBMetaImpl>(new meta::DBMetaImpl(metaOptions));
|
||||
return std::shared_ptr<meta::DBMetaImpl>(new meta::DBMetaImpl(metaOptions));
|
||||
return std::make_shared<meta::DBMetaImpl>(meta::DBMetaImpl(metaOptions));
|
||||
}
|
||||
else {
|
||||
LOG(ERROR) << "Invalid dialect in URI: dialect = " << dialect;
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <regex>
|
||||
#include <string>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
#include "mysql++/mysql++.h"
|
||||
|
||||
@ -30,8 +31,8 @@ namespace meta {
|
||||
|
||||
using namespace mysqlpp;
|
||||
|
||||
static std::unique_ptr<Connection> connectionPtr(new Connection());
|
||||
std::recursive_mutex mysql_mutex;
|
||||
// static std::unique_ptr<Connection> connectionPtr(new Connection());
|
||||
// std::recursive_mutex mysql_mutex;
|
||||
//
|
||||
// std::unique_ptr<Connection>& MySQLMetaImpl::getConnectionPtr() {
|
||||
//// static std::recursive_mutex connectionMutex_;
|
||||
@ -109,7 +110,7 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::Initialize() {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
if (!boost::filesystem::is_directory(options_.path)) {
|
||||
auto ret = boost::filesystem::create_directory(options_.path);
|
||||
@ -153,12 +154,19 @@ namespace meta {
|
||||
//std::cout << dbName << " " << serverAddress << " " << username << " " << password << " " << port << std::endl;
|
||||
// connectionPtr->set_option(new MultiStatementsOption(true));
|
||||
// connectionPtr->set_option(new mysqlpp::ReconnectOption(true));
|
||||
connectionPtr->set_option(new mysqlpp::ReconnectOption(true));
|
||||
std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl;
|
||||
int threadHint = std::thread::hardware_concurrency();
|
||||
int maxPoolSize = threadHint == 0 ? 8 : threadHint;
|
||||
mySQLConnectionPool_ = std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
|
||||
// std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl;
|
||||
|
||||
try {
|
||||
if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) {
|
||||
return Status::Error("DB connection failed: ", connectionPtr->error());
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
// if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) {
|
||||
// return Status::Error("DB connection failed: ", connectionPtr->error());
|
||||
// }
|
||||
if (!connectionPtr->thread_aware()) {
|
||||
ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
|
||||
return Status::Error("MySQL++ wasn't built with thread awareness! Can't run without it.");
|
||||
}
|
||||
|
||||
CleanUp();
|
||||
@ -220,9 +228,12 @@ namespace meta {
|
||||
} catch (const Exception& er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
return Status::DBTransactionError("GENERAL ERROR DURING INITIALIZATION", er.what());
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception during initialization", e);
|
||||
}
|
||||
}
|
||||
else {
|
||||
ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri;
|
||||
return Status::Error("Wrong URI format");
|
||||
}
|
||||
}
|
||||
@ -231,7 +242,7 @@ namespace meta {
|
||||
Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
|
||||
const DatesT &dates) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
if (dates.size() == 0) {
|
||||
return Status::OK();
|
||||
@ -246,6 +257,8 @@ namespace meta {
|
||||
|
||||
try {
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
auto yesterday = GetDateWithDelta(-1);
|
||||
|
||||
for (auto &date : dates) {
|
||||
@ -284,13 +297,15 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
// server::Metrics::GetInstance().MetaAccessTotalIncrement();
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query createTableQuery = connectionPtr->query();
|
||||
|
||||
if (table_schema.table_id_.empty()) {
|
||||
@ -304,7 +319,7 @@ namespace meta {
|
||||
if (res.num_rows() == 1) {
|
||||
int state = res[0]["state"];
|
||||
std::string msg = (TableSchema::TO_DELETE == state) ?
|
||||
"Table already exists" : "Table already exists and it is in delete state, please wait a second";
|
||||
"Table already exists and it is in delete state, please wait a second" : "Table already exists";
|
||||
return Status::Error(msg);
|
||||
}
|
||||
}
|
||||
@ -360,6 +375,8 @@ namespace meta {
|
||||
} catch (const Exception& er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE", er.what());
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when create table", e);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -367,12 +384,14 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::DeleteTable(const std::string& table_id) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
//soft delete table
|
||||
Query deleteTableQuery = connectionPtr->query();
|
||||
//
|
||||
@ -398,6 +417,8 @@ namespace meta {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
//soft delete table files
|
||||
Query deleteTableFilesQuery = connectionPtr->query();
|
||||
//
|
||||
@ -423,12 +444,14 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query describeTableQuery = connectionPtr->query();
|
||||
describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " <<
|
||||
"FROM meta " <<
|
||||
@ -470,13 +493,13 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query hasTableQuery = connectionPtr->query();
|
||||
//since table_id is a unique column we just need to check whether it exists or not
|
||||
@ -504,12 +527,14 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query allTablesQuery = connectionPtr->query();
|
||||
allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " <<
|
||||
"FROM meta " <<
|
||||
@ -548,7 +573,7 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
if (file_schema.date_ == EmptyDate) {
|
||||
file_schema.date_ = Meta::GetDate();
|
||||
@ -564,6 +589,8 @@ namespace meta {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
NextFileId(file_schema.file_id_);
|
||||
file_schema.file_type_ = TableFileSchema::NEW;
|
||||
file_schema.dimension_ = table_schema.dimension_;
|
||||
@ -617,6 +644,8 @@ namespace meta {
|
||||
} catch (const Exception& er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN ADDING TABLE FILE", er.what());
|
||||
} catch (std::exception& ex) {
|
||||
return HandleException("Encounter exception when create table file", ex);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -624,7 +653,7 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
files.clear();
|
||||
|
||||
@ -632,6 +661,8 @@ namespace meta {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query filesToIndexQuery = connectionPtr->query();
|
||||
filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " <<
|
||||
"FROM metaFile " <<
|
||||
@ -692,7 +723,7 @@ namespace meta {
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
files.clear();
|
||||
|
||||
@ -700,6 +731,8 @@ namespace meta {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
if (partition.empty()) {
|
||||
@ -789,13 +822,15 @@ namespace meta {
|
||||
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query filesToMergeQuery = connectionPtr->query();
|
||||
filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " <<
|
||||
"FROM metaFile " <<
|
||||
@ -858,7 +893,7 @@ namespace meta {
|
||||
const std::vector<size_t>& ids,
|
||||
TableFilesSchema& table_files) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
std::stringstream idSS;
|
||||
for (auto& id : ids) {
|
||||
@ -869,6 +904,8 @@ namespace meta {
|
||||
|
||||
try {
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query getTableFileQuery = connectionPtr->query();
|
||||
getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " <<
|
||||
"FROM metaFile " <<
|
||||
@ -923,7 +960,7 @@ namespace meta {
|
||||
// PXU TODO: Support Swap
|
||||
Status MySQLMetaImpl::Archive() {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
auto &criterias = options_.archive_conf.GetCriterias();
|
||||
if (criterias.empty()) {
|
||||
@ -936,8 +973,11 @@ namespace meta {
|
||||
if (criteria == "days") {
|
||||
size_t usecs = limit * D_SEC * US_PS;
|
||||
long now = utils::GetMicroSecTimeStamp();
|
||||
|
||||
try {
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query archiveQuery = connectionPtr->query();
|
||||
archiveQuery << "UPDATE metaFile " <<
|
||||
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
|
||||
@ -969,11 +1009,13 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::Size(uint64_t &result) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
result = 0;
|
||||
try {
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query getSizeQuery = connectionPtr->query();
|
||||
getSizeQuery << "SELECT SUM(size) AS sum " <<
|
||||
"FROM metaFile " <<
|
||||
@ -1007,7 +1049,7 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
if (to_discard_size <= 0) {
|
||||
// std::cout << "in" << std::endl;
|
||||
@ -1019,6 +1061,8 @@ namespace meta {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query discardFilesQuery = connectionPtr->query();
|
||||
discardFilesQuery << "SELECT id, size " <<
|
||||
"FROM metaFile " <<
|
||||
@ -1074,13 +1118,15 @@ namespace meta {
|
||||
//ZR: this function assumes all fields in file_schema have value
|
||||
Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query updateTableFileQuery = connectionPtr->query();
|
||||
|
||||
//if the table has been deleted, just mark the table file as TO_DELETE
|
||||
@ -1141,11 +1187,13 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query updateTableFilesQuery = connectionPtr->query();
|
||||
|
||||
std::map<std::string, bool> has_tables;
|
||||
@ -1212,13 +1260,17 @@ namespace meta {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// static int b_count = 0;
|
||||
// b_count++;
|
||||
// std::cout << "CleanUpFilesWithTTL: " << b_count << std::endl;
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
|
||||
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " <<
|
||||
"FROM metaFile " <<
|
||||
@ -1276,12 +1328,15 @@ namespace meta {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
|
||||
cleanUpFilesWithTTLQuery << "SELECT id, table_id " <<
|
||||
"FROM meta " <<
|
||||
"WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
|
||||
StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
|
||||
assert(res);
|
||||
// std::cout << res.num_rows() << std::endl;
|
||||
std::stringstream idsToDeleteSS;
|
||||
for (auto& resRow : res) {
|
||||
size_t id = resRow["id"];
|
||||
@ -1317,9 +1372,11 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::CleanUp() {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
try {
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
|
||||
Query cleanUpQuery = connectionPtr->query();
|
||||
cleanUpQuery << "DELETE FROM metaFile WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";";
|
||||
@ -1341,11 +1398,13 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query countQuery = connectionPtr->query();
|
||||
countQuery << "SELECT size " <<
|
||||
"FROM metaFile " <<
|
||||
@ -1385,12 +1444,15 @@ namespace meta {
|
||||
|
||||
Status MySQLMetaImpl::DropAll() {
|
||||
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
|
||||
if (boost::filesystem::is_directory(options_.path)) {
|
||||
boost::filesystem::remove_all(options_.path);
|
||||
}
|
||||
try {
|
||||
|
||||
ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab);
|
||||
|
||||
Query dropTableQuery = connectionPtr->query();
|
||||
dropTableQuery << "DROP TABLE IF EXISTS meta, metaFile;";
|
||||
if (dropTableQuery.exec()) {
|
||||
@ -1406,10 +1468,11 @@ namespace meta {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING TABLE", er.what());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
MySQLMetaImpl::~MySQLMetaImpl() {
|
||||
std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
// std::lock_guard<std::recursive_mutex> lock(mysql_mutex);
|
||||
CleanUp();
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@
|
||||
|
||||
#include "Meta.h"
|
||||
#include "Options.h"
|
||||
#include "MySQLConnectionPool.h"
|
||||
|
||||
#include "mysql++/mysql++.h"
|
||||
#include <mutex>
|
||||
@ -77,6 +78,9 @@ namespace meta {
|
||||
|
||||
const DBMetaOptions options_;
|
||||
|
||||
std::shared_ptr<MySQLConnectionPool> mySQLConnectionPool_;
|
||||
bool safe_grab = false;
|
||||
|
||||
// std::mutex connectionMutex_;
|
||||
}; // DBMetaImpl
|
||||
|
||||
|
1
cpp/thirdparty/versions.txt
vendored
1
cpp/thirdparty/versions.txt
vendored
@ -7,6 +7,7 @@ GTEST_VERSION=1.8.1
|
||||
JSONCONS_VERSION=0.126.0
|
||||
LAPACK_VERSION=v3.8.0
|
||||
LZ4_VERSION=v1.9.1
|
||||
MYSQLPP_VERSION=3.2.4
|
||||
OPENBLAS_VERSION=v0.3.6
|
||||
PROMETHEUS_VERSION=v0.7.0
|
||||
ROCKSDB_VERSION=v6.0.2
|
||||
|
@ -20,10 +20,10 @@ set(db_scheduler_srcs
|
||||
include_directories(/usr/local/cuda/include)
|
||||
link_directories("/usr/local/cuda/lib64")
|
||||
|
||||
|
||||
include_directories(/usr/include/mysql)
|
||||
|
||||
set(db_test_src
|
||||
${unittest_srcs}
|
||||
#${unittest_srcs}
|
||||
${config_files}
|
||||
${cache_srcs}
|
||||
${db_srcs}
|
||||
@ -35,8 +35,6 @@ set(db_test_src
|
||||
db_tests.cpp
|
||||
meta_tests.cpp)
|
||||
|
||||
include_directories(/usr/include/mysql)
|
||||
|
||||
cuda_add_executable(db_test ${db_test_src})
|
||||
|
||||
set(db_libs
|
||||
@ -48,8 +46,9 @@ set(db_libs
|
||||
boost_system
|
||||
boost_filesystem
|
||||
lz4
|
||||
mysqlpp
|
||||
)
|
||||
|
||||
target_link_libraries(db_test ${db_libs} ${unittest_libs} /usr/local/lib/libmysqlpp.so)
|
||||
target_link_libraries(db_test ${db_libs} ${unittest_libs})
|
||||
|
||||
install(TARGETS db_test DESTINATION bin)
|
||||
|
@ -233,6 +233,7 @@ TEST_F(MySQLTest, table_file_TEST) {
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = group.table_id_;
|
||||
status = impl.CreateTableFile(table_file);
|
||||
// std::cout << status.ToString() << std::endl;
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW);
|
||||
|
||||
|
@ -286,9 +286,6 @@ TEST_F(DBTest2, DELETE_TEST) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
|
||||
|
||||
stat = db_->DropAll();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
};
|
||||
|
||||
TEST_F(MySQLDBTest, DB_TEST) {
|
||||
@ -371,8 +368,11 @@ TEST_F(MySQLDBTest, DB_TEST) {
|
||||
|
||||
search.join();
|
||||
|
||||
stat = db_->DropAll();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
delete db_;
|
||||
|
||||
auto dummyDB = engine::DBFactory::Build(options);
|
||||
dummyDB->DropAll();
|
||||
delete dummyDB;
|
||||
};
|
||||
|
||||
TEST_F(MySQLDBTest, SEARCH_TEST) {
|
||||
@ -429,8 +429,11 @@ TEST_F(MySQLDBTest, SEARCH_TEST) {
|
||||
stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results);
|
||||
ASSERT_STATS(stat);
|
||||
|
||||
stat = db_->DropAll();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
delete db_;
|
||||
|
||||
auto dummyDB = engine::DBFactory::Build(options);
|
||||
dummyDB->DropAll();
|
||||
delete dummyDB;
|
||||
|
||||
// TODO(linxj): add groundTruth assert
|
||||
};
|
||||
@ -466,14 +469,17 @@ TEST_F(MySQLDBTest, ARHIVE_DISK_CHECK) {
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10)); //change to 10 to make sure files are discarded
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
db_->Size(size);
|
||||
LOG(DEBUG) << "size=" << size;
|
||||
ASSERT_LE(size, 1 * engine::meta::G);
|
||||
|
||||
stat = db_->DropAll();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
delete db_;
|
||||
|
||||
auto dummyDB = engine::DBFactory::Build(options);
|
||||
dummyDB->DropAll();
|
||||
delete dummyDB;
|
||||
};
|
||||
|
||||
TEST_F(MySQLDBTest, DELETE_TEST) {
|
||||
@ -484,12 +490,14 @@ TEST_F(MySQLDBTest, DELETE_TEST) {
|
||||
|
||||
engine::meta::TableSchema table_info = BuildTableSchema();
|
||||
engine::Status stat = db_->CreateTable(table_info);
|
||||
// std::cout << stat.ToString() << std::endl;
|
||||
|
||||
engine::meta::TableSchema table_info_get;
|
||||
table_info_get.table_id_ = TABLE_NAME;
|
||||
stat = db_->DescribeTable(table_info_get);
|
||||
ASSERT_STATS(stat);
|
||||
|
||||
// std::cout << "location: " << table_info_get.location_ << std::endl;
|
||||
ASSERT_TRUE(boost::filesystem::exists(table_info_get.location_));
|
||||
|
||||
engine::IDNumbers vector_ids;
|
||||
@ -509,13 +517,15 @@ TEST_F(MySQLDBTest, DELETE_TEST) {
|
||||
|
||||
std::vector<engine::meta::DateT> dates;
|
||||
stat = db_->DeleteTable(TABLE_NAME, dates);
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10)); //change to 10 to make sure files are discarded
|
||||
|
||||
// std::cout << "5 sec start" << std::endl;
|
||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
// std::cout << "5 sec finish" << std::endl;
|
||||
ASSERT_TRUE(stat.ok());
|
||||
// std::cout << table_info_get.location_ << std::endl;
|
||||
ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
|
||||
// ASSERT_FALSE(boost::filesystem::exists(table_info_get.location_));
|
||||
|
||||
stat = db_->DropAll();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
delete db_;
|
||||
|
||||
auto dummyDB = engine::DBFactory::Build(options);
|
||||
dummyDB->DropAll();
|
||||
delete dummyDB;
|
||||
};
|
||||
|
@ -13,8 +13,27 @@
|
||||
#include "db/Factories.h"
|
||||
#include "db/Options.h"
|
||||
|
||||
INITIALIZE_EASYLOGGINGPP
|
||||
|
||||
using namespace zilliz::milvus;
|
||||
|
||||
static std::string uri;
|
||||
|
||||
class DBTestEnvironment : public ::testing::Environment {
|
||||
public:
|
||||
|
||||
// explicit DBTestEnvironment(std::string uri) : uri_(uri) {}
|
||||
|
||||
static std::string getURI() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
void SetUp() override {
|
||||
getURI();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
void ASSERT_STATS(engine::Status& stat) {
|
||||
ASSERT_TRUE(stat.ok());
|
||||
if(!stat.ok()) {
|
||||
@ -69,14 +88,23 @@ zilliz::milvus::engine::DBMetaOptions MySQLTest::getDBMetaOptions() {
|
||||
// engine::DBMetaOptions options = engine::DBMetaOptionsFactory::Build(path);
|
||||
zilliz::milvus::engine::DBMetaOptions options;
|
||||
options.path = "/tmp/milvus_test";
|
||||
options.backend_uri = "mysql://root:1234@:/test";
|
||||
options.backend_uri = DBTestEnvironment::getURI();
|
||||
return options;
|
||||
|
||||
}
|
||||
|
||||
zilliz::milvus::engine::Options MySQLDBTest::GetOptions() {
|
||||
auto options = engine::OptionsFactory::Build();
|
||||
options.meta.path = "/tmp/milvus_test";
|
||||
options.meta.backend_uri = "mysql://root:1234@:/test";
|
||||
options.meta.backend_uri = DBTestEnvironment::getURI();
|
||||
return options;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
if (argc > 1) {
|
||||
uri = argv[1];
|
||||
}
|
||||
// std::cout << uri << std::endl;
|
||||
::testing::AddGlobalTestEnvironment(new DBTestEnvironment);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user