mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 11:29:48 +08:00
MS-339 implement new api
Former-commit-id: 53e4a9309807b06dc43761755146d54d5ef63cf5
This commit is contained in:
parent
81cc572fe9
commit
5b918a4028
@ -41,6 +41,9 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
- MS-338 - NewAPI: refine code to support CreateIndex
|
||||
- MS-339 - NewAPI: refine code to support DropIndex
|
||||
- MS-340 - NewAPI: implement DescribeIndex
|
||||
|
||||
## Task
|
||||
- MS-297 - disable mysql unit test
|
||||
|
@ -23,9 +23,9 @@ typedef std::vector<QueryResult> QueryResults;
|
||||
|
||||
struct TableIndex {
|
||||
int32_t engine_type_ = (int)EngineType::FAISS_IDMAP;
|
||||
int32_t nlist = 16384;
|
||||
int32_t index_file_size = 1024; //MB
|
||||
int32_t metric_type = (int)MetricType::L2;
|
||||
int32_t nlist_ = 16384;
|
||||
int32_t index_file_size_ = 1024; //MB
|
||||
int32_t metric_type_ = (int)MetricType::L2;
|
||||
};
|
||||
|
||||
} // namespace engine
|
||||
|
@ -148,9 +148,9 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
|
||||
|
||||
bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
|
||||
return index1.engine_type_ == index2.engine_type_
|
||||
&& index1.nlist == index2.nlist
|
||||
&& index1.index_file_size == index2.index_file_size
|
||||
&& index1.metric_type == index2.metric_type;
|
||||
&& index1.nlist_ == index2.nlist_
|
||||
&& index1.index_file_size_ == index2.index_file_size_
|
||||
&& index1.metric_type_ == index2.metric_type_;
|
||||
}
|
||||
|
||||
} // namespace utils
|
||||
|
@ -32,15 +32,6 @@ namespace meta {
|
||||
|
||||
using namespace mysqlpp;
|
||||
|
||||
|
||||
|
||||
//
|
||||
|
||||
//
|
||||
|
||||
|
||||
|
||||
|
||||
namespace {
|
||||
|
||||
Status HandleException(const std::string &desc, std::exception &e) {
|
||||
@ -91,8 +82,6 @@ MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::Initialize() {
|
||||
|
||||
|
||||
if (!boost::filesystem::is_directory(options_.path)) {
|
||||
auto ret = boost::filesystem::create_directory(options_.path);
|
||||
if (!ret) {
|
||||
@ -160,7 +149,6 @@ Status MySQLMetaImpl::Initialize() {
|
||||
}
|
||||
Query InitializeQuery = connectionPtr->query();
|
||||
|
||||
|
||||
InitializeQuery << "CREATE TABLE IF NOT EXISTS Tables (" <<
|
||||
"id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
|
||||
"table_id VARCHAR(255) UNIQUE NOT NULL, " <<
|
||||
@ -197,13 +185,6 @@ Status MySQLMetaImpl::Initialize() {
|
||||
}
|
||||
} //Scoped Connection
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
return Status::OK();
|
||||
|
||||
|
||||
} catch (const BadQuery &er) {
|
||||
// Handle any query errors
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR DURING INITIALIZATION" << ": " << er.what();
|
||||
@ -219,13 +200,13 @@ Status MySQLMetaImpl::Initialize() {
|
||||
ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri;
|
||||
return Status::Error("Wrong URI format");
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// PXU TODO: Temp solution. Will fix later
|
||||
Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
|
||||
const DatesT &dates) {
|
||||
|
||||
|
||||
if (dates.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
@ -290,12 +271,8 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -303,7 +280,6 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query createTableQuery = connectionPtr->query();
|
||||
|
||||
if (table_schema.table_id_.empty()) {
|
||||
@ -312,7 +288,6 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
createTableQuery << "SELECT state FROM Tables " <<
|
||||
"WHERE table_id = " << quote << table_schema.table_id_ << ";";
|
||||
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
|
||||
|
||||
StoreQueryResult res = createTableQuery.store();
|
||||
@ -330,7 +305,6 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
table_schema.id_ = -1;
|
||||
table_schema.created_on_ = utils::GetMicroSecTimeStamp();
|
||||
|
||||
|
||||
std::string id = "NULL"; //auto-increment
|
||||
std::string table_id = table_schema.table_id_;
|
||||
std::string state = std::to_string(table_schema.state_);
|
||||
@ -342,26 +316,18 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
"(" << id << ", " << quote << table_id << ", " << state << ", " << dimension << ", " <<
|
||||
created_on << ", " << engine_type << ");";
|
||||
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CreateTable: " << createTableQuery.str();
|
||||
|
||||
if (SimpleResult res = createTableQuery.execute()) {
|
||||
table_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
|
||||
|
||||
//Consume all results to avoid "Commands out of sync" error
|
||||
|
||||
|
||||
|
||||
} else {
|
||||
ENGINE_LOG_ERROR << "Add Table Error";
|
||||
return Status::DBTransactionError("Add Table Error", createTableQuery.error());
|
||||
}
|
||||
} //Scoped Connection
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
return utils::CreateTablePath(options_, table_schema.table_id_);
|
||||
|
||||
} catch (const BadQuery &er) {
|
||||
@ -375,18 +341,13 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when create table", e);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
|
||||
|
||||
has = false;
|
||||
|
||||
try {
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -394,7 +355,6 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query hasNonIndexFilesQuery = connectionPtr->query();
|
||||
//since table_id is a unique column we just need to check whether it exists or not
|
||||
hasNonIndexFilesQuery << "SELECT EXISTS " <<
|
||||
@ -429,22 +389,7 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
|
||||
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
{
|
||||
@ -454,9 +399,174 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
Query updateTableIndexParamQuery = connectionPtr->query();
|
||||
updateTableIndexParamQuery << "SELECT id, state, dimension, created_on " <<
|
||||
"FROM Tables " <<
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str();
|
||||
|
||||
StoreQueryResult res = updateTableIndexParamQuery.store();
|
||||
|
||||
if (res.num_rows() == 1) {
|
||||
const Row &resRow = res[0];
|
||||
|
||||
size_t id = resRow["id"];
|
||||
int32_t state = resRow["state"];
|
||||
uint16_t dimension = resRow["dimension"];
|
||||
int64_t created_on = resRow["created_on"];
|
||||
|
||||
updateTableIndexParamQuery << "UPDATE Tables " <<
|
||||
"SET id = " << id << ", " <<
|
||||
"state = " << state << ", " <<
|
||||
"dimension = " << dimension << ", " <<
|
||||
"created_on = " << created_on << ", " <<
|
||||
"engine_type_ = " << index.engine_type_ << ", " <<
|
||||
"nlist = " << index.nlist_ << ", " <<
|
||||
"index_file_size = " << index.index_file_size_ << ", " <<
|
||||
"metric_type = " << index.metric_type_ << ", " <<
|
||||
"WHERE id = " << quote << table_id << ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str();
|
||||
|
||||
|
||||
if (!updateTableIndexParamQuery.exec()) {
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE INDEX PARAM";
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM",
|
||||
updateTableIndexParamQuery.error());
|
||||
}
|
||||
} else {
|
||||
return Status::NotFound("Table " + table_id + " not found");
|
||||
}
|
||||
|
||||
} //Scoped Connection
|
||||
|
||||
} catch (const BadQuery &er) {
|
||||
// Handle any query errors
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN UPDATING TABLE INDEX PARAM" << ": " << er.what();
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN UPDATING TABLE INDEX PARAM", er.what());
|
||||
} catch (const Exception &er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex& index) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
Query describeTableIndexQuery = connectionPtr->query();
|
||||
describeTableIndexQuery << "SELECT engine_type, nlist, index_file_size, metric_type " <<
|
||||
"FROM Tables " <<
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DescribeTableIndex: " << describeTableIndexQuery.str();
|
||||
|
||||
StoreQueryResult res = describeTableIndexQuery.store();
|
||||
|
||||
if (res.num_rows() == 1) {
|
||||
const Row &resRow = res[0];
|
||||
|
||||
index.engine_type_ = resRow["engine_type"];
|
||||
index.nlist_ = resRow["nlist"];
|
||||
index.index_file_size_ = resRow["index_file_size"];
|
||||
index.metric_type_ = resRow["metric_type"];
|
||||
} else {
|
||||
return Status::NotFound("Table " + table_id + " not found");
|
||||
}
|
||||
|
||||
} //Scoped Connection
|
||||
|
||||
} catch (const BadQuery &er) {
|
||||
// Handle any query errors
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DESCRIBE TABLE INDEX" << ": " << er.what();
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN DESCRIBE TABLE INDEX", er.what());
|
||||
} catch (const Exception &er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DESCRIBE TABLE INDEX" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN DESCRIBE TABLE INDEX", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
Query dropTableIndexQuery = connectionPtr->query();
|
||||
|
||||
dropTableIndexQuery << "UPDATE TableFiles " <<
|
||||
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << "," <<
|
||||
"updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::INDEX) << ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();
|
||||
|
||||
if (!dropTableIndexQuery.exec()) {
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROP TABLE INDEX";
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN DROP TABLE INDEX",
|
||||
dropTableIndexQuery.error());
|
||||
}
|
||||
|
||||
dropTableIndexQuery << "UPDATE TableFiles " <<
|
||||
"SET file_type = " << std::to_string(TableFileSchema::RAW) << "," <<
|
||||
"updated_time = " << utils::GetMicroSecTimeStamp() << " " <<
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::BACKUP) << ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropTableIndex: " << dropTableIndexQuery.str();
|
||||
|
||||
if (!dropTableIndexQuery.exec()) {
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROP TABLE INDEX";
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN DROP TABLE INDEX",
|
||||
dropTableIndexQuery.error());
|
||||
}
|
||||
|
||||
} //Scoped Connection
|
||||
|
||||
} catch (const BadQuery &er) {
|
||||
// Handle any query errors
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROP TABLE INDEX" << ": " << er.what();
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN DROP TABLE INDEX", er.what());
|
||||
} catch (const Exception &er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROP TABLE INDEX" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN DROP TABLE INDEX", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
//soft delete table
|
||||
Query deleteTableQuery = connectionPtr->query();
|
||||
@ -474,7 +584,6 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
|
||||
|
||||
} //Scoped Connection
|
||||
|
||||
|
||||
if (mode_ == Options::MODE::CLUSTER) {
|
||||
DeleteTableFiles(table_id);
|
||||
}
|
||||
@ -495,7 +604,6 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
|
||||
Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -503,10 +611,6 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
//soft delete table files
|
||||
Query deleteTableFilesQuery = connectionPtr->query();
|
||||
//
|
||||
@ -537,14 +641,9 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -552,7 +651,6 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query describeTableQuery = connectionPtr->query();
|
||||
describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " <<
|
||||
"FROM Tables " <<
|
||||
@ -590,14 +688,9 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
||||
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -605,7 +698,6 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query hasTableQuery = connectionPtr->query();
|
||||
//since table_id is a unique column we just need to check whether it exists or not
|
||||
hasTableQuery << "SELECT EXISTS " <<
|
||||
@ -636,14 +728,9 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -651,7 +738,6 @@ Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query allTablesQuery = connectionPtr->query();
|
||||
allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " <<
|
||||
"FROM Tables " <<
|
||||
@ -691,8 +777,6 @@ Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
|
||||
|
||||
if (file_schema.date_ == EmptyDate) {
|
||||
file_schema.date_ = Meta::GetDate();
|
||||
}
|
||||
@ -704,7 +788,6 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
NextFileId(file_schema.file_id_);
|
||||
@ -733,7 +816,6 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query createTableFileQuery = connectionPtr->query();
|
||||
|
||||
createTableFileQuery << "INSERT INTO TableFiles VALUES" <<
|
||||
@ -747,9 +829,6 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
file_schema.id_ = res.insert_id(); //Might need to use SELECT LAST_INSERT_ID()?
|
||||
|
||||
//Consume all results to avoid "Commands out of sync" error
|
||||
|
||||
|
||||
|
||||
} else {
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN ADDING TABLE FILE";
|
||||
return Status::DBTransactionError("Add file Error", createTableFileQuery.error());
|
||||
@ -769,21 +848,14 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
} catch (std::exception &ex) {
|
||||
return HandleException("Encounter exception when create table file", ex);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
|
||||
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -791,7 +863,6 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query filesToIndexQuery = connectionPtr->query();
|
||||
filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, row_count, date " <<
|
||||
"FROM TableFiles " <<
|
||||
@ -857,16 +928,11 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
|
||||
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -874,7 +940,6 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
if (partition.empty()) {
|
||||
|
||||
Query filesToSearchQuery = connectionPtr->query();
|
||||
@ -971,16 +1036,11 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
|
||||
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1080,15 +1140,11 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
|
||||
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
|
||||
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1096,7 +1152,6 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query filesToMergeQuery = connectionPtr->query();
|
||||
filesToMergeQuery << "SELECT id, table_id, file_id, file_type, file_size, date " <<
|
||||
"FROM TableFiles " <<
|
||||
@ -1164,8 +1219,6 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
|
||||
Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
TableFilesSchema &table_files) {
|
||||
|
||||
|
||||
if (ids.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
@ -1178,9 +1231,7 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
|
||||
idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "
|
||||
|
||||
try {
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1250,8 +1301,6 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
|
||||
|
||||
// PXU TODO: Support Swap
|
||||
Status MySQLMetaImpl::Archive() {
|
||||
|
||||
|
||||
auto &criterias = options_.archive_conf.GetCriterias();
|
||||
if (criterias.empty()) {
|
||||
return Status::OK();
|
||||
@ -1265,14 +1314,12 @@ Status MySQLMetaImpl::Archive() {
|
||||
long now = utils::GetMicroSecTimeStamp();
|
||||
|
||||
try {
|
||||
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query archiveQuery = connectionPtr->query();
|
||||
archiveQuery << "UPDATE TableFiles " <<
|
||||
"SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " <<
|
||||
@ -1308,13 +1355,10 @@ Status MySQLMetaImpl::Archive() {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::Size(uint64_t &result) {
|
||||
|
||||
|
||||
result = 0;
|
||||
|
||||
try {
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1333,16 +1377,10 @@ Status MySQLMetaImpl::Size(uint64_t &result) {
|
||||
res = getSizeQuery.store();
|
||||
} //Scoped Connection
|
||||
|
||||
|
||||
//
|
||||
|
||||
|
||||
if (res.empty()) {
|
||||
result = 0;
|
||||
|
||||
} else {
|
||||
result = res[0]["sum"];
|
||||
|
||||
}
|
||||
|
||||
} catch (const BadQuery &er) {
|
||||
@ -1359,8 +1397,6 @@ Status MySQLMetaImpl::Size(uint64_t &result) {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
|
||||
|
||||
|
||||
if (to_discard_size <= 0) {
|
||||
|
||||
return Status::OK();
|
||||
@ -1368,11 +1404,8 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
|
||||
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
bool status;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1380,7 +1413,6 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query discardFilesQuery = connectionPtr->query();
|
||||
discardFilesQuery << "SELECT id, file_size " <<
|
||||
"FROM TableFiles " <<
|
||||
@ -1390,9 +1422,7 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DiscardFiles: " << discardFilesQuery.str();
|
||||
|
||||
|
||||
StoreQueryResult res = discardFilesQuery.store();
|
||||
|
||||
if (res.num_rows() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
@ -1443,13 +1473,10 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
|
||||
|
||||
//ZR: this function assumes all fields in file_schema have value
|
||||
Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||
|
||||
|
||||
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1457,7 +1484,6 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query updateTableFileQuery = connectionPtr->query();
|
||||
|
||||
//if the table has been deleted, just mark the table file as TO_DELETE
|
||||
@ -1563,11 +1589,8 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
|
||||
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1575,7 +1598,6 @@ Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query updateTableFilesQuery = connectionPtr->query();
|
||||
|
||||
std::map<std::string, bool> has_tables;
|
||||
@ -1648,6 +1670,7 @@ Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN UPDATING TABLE FILES" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN UPDATING TABLE FILES", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -1666,7 +1689,6 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
|
||||
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " <<
|
||||
"FROM TableFiles " <<
|
||||
@ -1746,7 +1768,6 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query cleanUpFilesWithTTLQuery = connectionPtr->query();
|
||||
cleanUpFilesWithTTLQuery << "SELECT id, table_id " <<
|
||||
"FROM Tables " <<
|
||||
@ -1756,7 +1777,6 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
|
||||
StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
|
||||
|
||||
|
||||
if (!res.empty()) {
|
||||
|
||||
std::stringstream idsToDeleteSS;
|
||||
@ -1835,8 +1855,6 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::CleanUp() {
|
||||
|
||||
|
||||
try {
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1844,7 +1862,6 @@ Status MySQLMetaImpl::CleanUp() {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query cleanUpQuery = connectionPtr->query();
|
||||
cleanUpQuery << "SELECT table_name " <<
|
||||
"FROM information_schema.tables " <<
|
||||
@ -1884,8 +1901,6 @@ Status MySQLMetaImpl::CleanUp() {
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
|
||||
|
||||
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
@ -1898,7 +1913,6 @@ Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
|
||||
}
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
@ -1945,24 +1959,21 @@ Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN RETRIEVING COUNT" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN RETRIEVING COUNT", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::DropAll() {
|
||||
|
||||
|
||||
if (boost::filesystem::is_directory(options_.path)) {
|
||||
boost::filesystem::remove_all(options_.path);
|
||||
}
|
||||
try {
|
||||
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
|
||||
Query dropTableQuery = connectionPtr->query();
|
||||
dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;";
|
||||
|
||||
@ -1983,11 +1994,11 @@ Status MySQLMetaImpl::DropAll() {
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN DROPPING TABLE" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN DROPPING TABLE", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
MySQLMetaImpl::~MySQLMetaImpl() {
|
||||
|
||||
if (mode_ != Options::MODE::READ_ONLY) {
|
||||
CleanUp();
|
||||
}
|
||||
|
@ -367,9 +367,9 @@ Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const
|
||||
table_schema.dimension_ = std::get<2>(tables[0]);
|
||||
table_schema.created_on_ = std::get<3>(tables[0]);
|
||||
table_schema.engine_type_ = index.engine_type_;
|
||||
table_schema.nlist_ = index.nlist;
|
||||
table_schema.index_file_size_ = index.index_file_size;
|
||||
table_schema.metric_type_ = index.metric_type;
|
||||
table_schema.nlist_ = index.nlist_;
|
||||
table_schema.index_file_size_ = index.index_file_size_;
|
||||
table_schema.metric_type_ = index.metric_type_;
|
||||
|
||||
ConnectorPtr->update(table_schema);
|
||||
} else {
|
||||
@ -407,9 +407,9 @@ Status SqliteMetaImpl::DescribeTableIndex(const std::string &table_id, TableInde
|
||||
|
||||
if (groups.size() == 1) {
|
||||
index.engine_type_ = std::get<0>(groups[0]);
|
||||
index.nlist = std::get<1>(groups[0]);
|
||||
index.index_file_size = std::get<2>(groups[0]);
|
||||
index.metric_type = std::get<3>(groups[0]);
|
||||
index.nlist_ = std::get<1>(groups[0]);
|
||||
index.index_file_size_ = std::get<2>(groups[0]);
|
||||
index.metric_type_ = std::get<3>(groups[0]);
|
||||
} else {
|
||||
return Status::NotFound("Table " + table_id + " not found");
|
||||
}
|
||||
|
@ -545,7 +545,7 @@ const char descriptor_table_protodef_milvus_2eproto[] PROTOBUF_SECTION_VARIABLE(
|
||||
"_reply\030\002 \001(\010\"M\n\rTableRowCount\022#\n\006status\030"
|
||||
"\001 \001(\0132\023.milvus.grpc.Status\022\027\n\017table_row_"
|
||||
"count\030\002 \001(\003\"\026\n\007Command\022\013\n\003cmd\030\001 \001(\t\"X\n\005I"
|
||||
"ndex\022\022\n\nindex_type\030\001 \001(\005\022\r\n\005nlist\030\002 \001(\003\022"
|
||||
"ndex\022\022\n\nindex_type\030\001 \001(\005\022\r\n\005nlist\030\002 \001(\005\022"
|
||||
"\027\n\017index_file_size\030\003 \001(\005\022\023\n\013metric_type\030"
|
||||
"\004 \001(\005\"[\n\nIndexParam\022*\n\ntable_name\030\001 \001(\0132"
|
||||
"\026.milvus.grpc.TableName\022!\n\005index\030\002 \001(\0132\022"
|
||||
@ -5268,16 +5268,16 @@ Index::Index(const Index& from)
|
||||
: ::PROTOBUF_NAMESPACE_ID::Message(),
|
||||
_internal_metadata_(nullptr) {
|
||||
_internal_metadata_.MergeFrom(from._internal_metadata_);
|
||||
::memcpy(&nlist_, &from.nlist_,
|
||||
::memcpy(&index_type_, &from.index_type_,
|
||||
static_cast<size_t>(reinterpret_cast<char*>(&metric_type_) -
|
||||
reinterpret_cast<char*>(&nlist_)) + sizeof(metric_type_));
|
||||
reinterpret_cast<char*>(&index_type_)) + sizeof(metric_type_));
|
||||
// @@protoc_insertion_point(copy_constructor:milvus.grpc.Index)
|
||||
}
|
||||
|
||||
void Index::SharedCtor() {
|
||||
::memset(&nlist_, 0, static_cast<size_t>(
|
||||
::memset(&index_type_, 0, static_cast<size_t>(
|
||||
reinterpret_cast<char*>(&metric_type_) -
|
||||
reinterpret_cast<char*>(&nlist_)) + sizeof(metric_type_));
|
||||
reinterpret_cast<char*>(&index_type_)) + sizeof(metric_type_));
|
||||
}
|
||||
|
||||
Index::~Index() {
|
||||
@ -5303,9 +5303,9 @@ void Index::Clear() {
|
||||
// Prevent compiler warnings about cached_has_bits being unused
|
||||
(void) cached_has_bits;
|
||||
|
||||
::memset(&nlist_, 0, static_cast<size_t>(
|
||||
::memset(&index_type_, 0, static_cast<size_t>(
|
||||
reinterpret_cast<char*>(&metric_type_) -
|
||||
reinterpret_cast<char*>(&nlist_)) + sizeof(metric_type_));
|
||||
reinterpret_cast<char*>(&index_type_)) + sizeof(metric_type_));
|
||||
_internal_metadata_.Clear();
|
||||
}
|
||||
|
||||
@ -5324,7 +5324,7 @@ const char* Index::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::inte
|
||||
CHK_(ptr);
|
||||
} else goto handle_unusual;
|
||||
continue;
|
||||
// int64 nlist = 2;
|
||||
// int32 nlist = 2;
|
||||
case 2:
|
||||
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 16)) {
|
||||
nlist_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr);
|
||||
@ -5388,12 +5388,12 @@ bool Index::MergePartialFromCodedStream(
|
||||
break;
|
||||
}
|
||||
|
||||
// int64 nlist = 2;
|
||||
// int32 nlist = 2;
|
||||
case 2: {
|
||||
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (16 & 0xFF)) {
|
||||
|
||||
DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive<
|
||||
::PROTOBUF_NAMESPACE_ID::int64, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_INT64>(
|
||||
::PROTOBUF_NAMESPACE_ID::int32, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_INT32>(
|
||||
input, &nlist_)));
|
||||
} else {
|
||||
goto handle_unusual;
|
||||
@ -5459,9 +5459,9 @@ void Index::SerializeWithCachedSizes(
|
||||
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt32(1, this->index_type(), output);
|
||||
}
|
||||
|
||||
// int64 nlist = 2;
|
||||
// int32 nlist = 2;
|
||||
if (this->nlist() != 0) {
|
||||
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64(2, this->nlist(), output);
|
||||
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt32(2, this->nlist(), output);
|
||||
}
|
||||
|
||||
// int32 index_file_size = 3;
|
||||
@ -5492,9 +5492,9 @@ void Index::SerializeWithCachedSizes(
|
||||
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt32ToArray(1, this->index_type(), target);
|
||||
}
|
||||
|
||||
// int64 nlist = 2;
|
||||
// int32 nlist = 2;
|
||||
if (this->nlist() != 0) {
|
||||
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(2, this->nlist(), target);
|
||||
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt32ToArray(2, this->nlist(), target);
|
||||
}
|
||||
|
||||
// int32 index_file_size = 3;
|
||||
@ -5528,13 +5528,6 @@ size_t Index::ByteSizeLong() const {
|
||||
// Prevent compiler warnings about cached_has_bits being unused
|
||||
(void) cached_has_bits;
|
||||
|
||||
// int64 nlist = 2;
|
||||
if (this->nlist() != 0) {
|
||||
total_size += 1 +
|
||||
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int64Size(
|
||||
this->nlist());
|
||||
}
|
||||
|
||||
// int32 index_type = 1;
|
||||
if (this->index_type() != 0) {
|
||||
total_size += 1 +
|
||||
@ -5542,6 +5535,13 @@ size_t Index::ByteSizeLong() const {
|
||||
this->index_type());
|
||||
}
|
||||
|
||||
// int32 nlist = 2;
|
||||
if (this->nlist() != 0) {
|
||||
total_size += 1 +
|
||||
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int32Size(
|
||||
this->nlist());
|
||||
}
|
||||
|
||||
// int32 index_file_size = 3;
|
||||
if (this->index_file_size() != 0) {
|
||||
total_size += 1 +
|
||||
@ -5583,12 +5583,12 @@ void Index::MergeFrom(const Index& from) {
|
||||
::PROTOBUF_NAMESPACE_ID::uint32 cached_has_bits = 0;
|
||||
(void) cached_has_bits;
|
||||
|
||||
if (from.nlist() != 0) {
|
||||
set_nlist(from.nlist());
|
||||
}
|
||||
if (from.index_type() != 0) {
|
||||
set_index_type(from.index_type());
|
||||
}
|
||||
if (from.nlist() != 0) {
|
||||
set_nlist(from.nlist());
|
||||
}
|
||||
if (from.index_file_size() != 0) {
|
||||
set_index_file_size(from.index_file_size());
|
||||
}
|
||||
@ -5618,8 +5618,8 @@ bool Index::IsInitialized() const {
|
||||
void Index::InternalSwap(Index* other) {
|
||||
using std::swap;
|
||||
_internal_metadata_.Swap(&other->_internal_metadata_);
|
||||
swap(nlist_, other->nlist_);
|
||||
swap(index_type_, other->index_type_);
|
||||
swap(nlist_, other->nlist_);
|
||||
swap(index_file_size_, other->index_file_size_);
|
||||
swap(metric_type_, other->metric_type_);
|
||||
}
|
||||
|
@ -2316,21 +2316,21 @@ class Index :
|
||||
// accessors -------------------------------------------------------
|
||||
|
||||
enum : int {
|
||||
kNlistFieldNumber = 2,
|
||||
kIndexTypeFieldNumber = 1,
|
||||
kNlistFieldNumber = 2,
|
||||
kIndexFileSizeFieldNumber = 3,
|
||||
kMetricTypeFieldNumber = 4,
|
||||
};
|
||||
// int64 nlist = 2;
|
||||
void clear_nlist();
|
||||
::PROTOBUF_NAMESPACE_ID::int64 nlist() const;
|
||||
void set_nlist(::PROTOBUF_NAMESPACE_ID::int64 value);
|
||||
|
||||
// int32 index_type = 1;
|
||||
void clear_index_type();
|
||||
::PROTOBUF_NAMESPACE_ID::int32 index_type() const;
|
||||
void set_index_type(::PROTOBUF_NAMESPACE_ID::int32 value);
|
||||
|
||||
// int32 nlist = 2;
|
||||
void clear_nlist();
|
||||
::PROTOBUF_NAMESPACE_ID::int32 nlist() const;
|
||||
void set_nlist(::PROTOBUF_NAMESPACE_ID::int32 value);
|
||||
|
||||
// int32 index_file_size = 3;
|
||||
void clear_index_file_size();
|
||||
::PROTOBUF_NAMESPACE_ID::int32 index_file_size() const;
|
||||
@ -2346,8 +2346,8 @@ class Index :
|
||||
class _Internal;
|
||||
|
||||
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
|
||||
::PROTOBUF_NAMESPACE_ID::int64 nlist_;
|
||||
::PROTOBUF_NAMESPACE_ID::int32 index_type_;
|
||||
::PROTOBUF_NAMESPACE_ID::int32 nlist_;
|
||||
::PROTOBUF_NAMESPACE_ID::int32 index_file_size_;
|
||||
::PROTOBUF_NAMESPACE_ID::int32 metric_type_;
|
||||
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
|
||||
@ -3827,15 +3827,15 @@ inline void Index::set_index_type(::PROTOBUF_NAMESPACE_ID::int32 value) {
|
||||
// @@protoc_insertion_point(field_set:milvus.grpc.Index.index_type)
|
||||
}
|
||||
|
||||
// int64 nlist = 2;
|
||||
// int32 nlist = 2;
|
||||
inline void Index::clear_nlist() {
|
||||
nlist_ = PROTOBUF_LONGLONG(0);
|
||||
nlist_ = 0;
|
||||
}
|
||||
inline ::PROTOBUF_NAMESPACE_ID::int64 Index::nlist() const {
|
||||
inline ::PROTOBUF_NAMESPACE_ID::int32 Index::nlist() const {
|
||||
// @@protoc_insertion_point(field_get:milvus.grpc.Index.nlist)
|
||||
return nlist_;
|
||||
}
|
||||
inline void Index::set_nlist(::PROTOBUF_NAMESPACE_ID::int64 value) {
|
||||
inline void Index::set_nlist(::PROTOBUF_NAMESPACE_ID::int32 value) {
|
||||
|
||||
nlist_ = value;
|
||||
// @@protoc_insertion_point(field_set:milvus.grpc.Index.nlist)
|
||||
|
@ -61,7 +61,7 @@ static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] =
|
||||
const char descriptor_table_protodef_status_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) =
|
||||
"\n\014status.proto\022\013milvus.grpc\"D\n\006Status\022*\n"
|
||||
"\nerror_code\030\001 \001(\0162\026.milvus.grpc.ErrorCod"
|
||||
"e\022\016\n\006reason\030\002 \001(\t*\354\003\n\tErrorCode\022\013\n\007SUCCE"
|
||||
"e\022\016\n\006reason\030\002 \001(\t*\230\004\n\tErrorCode\022\013\n\007SUCCE"
|
||||
"SS\020\000\022\024\n\020UNEXPECTED_ERROR\020\001\022\022\n\016CONNECT_FA"
|
||||
"ILED\020\002\022\025\n\021PERMISSION_DENIED\020\003\022\024\n\020TABLE_N"
|
||||
"OT_EXISTS\020\004\022\024\n\020ILLEGAL_ARGUMENT\020\005\022\021\n\rILL"
|
||||
@ -73,7 +73,9 @@ const char descriptor_table_protodef_status_2eproto[] PROTOBUF_SECTION_VARIABLE(
|
||||
"TA_FAILED\020\017\022\020\n\014CACHE_FAILED\020\020\022\030\n\024CANNOT_"
|
||||
"CREATE_FOLDER\020\021\022\026\n\022CANNOT_CREATE_FILE\020\022\022"
|
||||
"\030\n\024CANNOT_DELETE_FOLDER\020\023\022\026\n\022CANNOT_DELE"
|
||||
"TE_FILE\020\024\022\025\n\021BUILD_INDEX_ERROR\020\025b\006proto3"
|
||||
"TE_FILE\020\024\022\025\n\021BUILD_INDEX_ERROR\020\025\022\021\n\rILLE"
|
||||
"GAL_NLIST\020\026\022\027\n\023ILLEGAL_METRIC_TYPE\020\027b\006pr"
|
||||
"oto3"
|
||||
;
|
||||
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_status_2eproto_deps[1] = {
|
||||
};
|
||||
@ -83,7 +85,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_sta
|
||||
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_status_2eproto_once;
|
||||
static bool descriptor_table_status_2eproto_initialized = false;
|
||||
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_status_2eproto = {
|
||||
&descriptor_table_status_2eproto_initialized, descriptor_table_protodef_status_2eproto, "status.proto", 600,
|
||||
&descriptor_table_status_2eproto_initialized, descriptor_table_protodef_status_2eproto, "status.proto", 644,
|
||||
&descriptor_table_status_2eproto_once, descriptor_table_status_2eproto_sccs, descriptor_table_status_2eproto_deps, 1, 0,
|
||||
schemas, file_default_instances, TableStruct_status_2eproto::offsets,
|
||||
file_level_metadata_status_2eproto, 1, file_level_enum_descriptors_status_2eproto, file_level_service_descriptors_status_2eproto,
|
||||
@ -121,6 +123,8 @@ bool ErrorCode_IsValid(int value) {
|
||||
case 19:
|
||||
case 20:
|
||||
case 21:
|
||||
case 22:
|
||||
case 23:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
|
@ -91,12 +91,14 @@ enum ErrorCode : int {
|
||||
CANNOT_DELETE_FOLDER = 19,
|
||||
CANNOT_DELETE_FILE = 20,
|
||||
BUILD_INDEX_ERROR = 21,
|
||||
ILLEGAL_NLIST = 22,
|
||||
ILLEGAL_METRIC_TYPE = 23,
|
||||
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
|
||||
ErrorCode_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
|
||||
};
|
||||
bool ErrorCode_IsValid(int value);
|
||||
constexpr ErrorCode ErrorCode_MIN = SUCCESS;
|
||||
constexpr ErrorCode ErrorCode_MAX = BUILD_INDEX_ERROR;
|
||||
constexpr ErrorCode ErrorCode_MAX = ILLEGAL_METRIC_TYPE;
|
||||
constexpr int ErrorCode_ARRAYSIZE = ErrorCode_MAX + 1;
|
||||
|
||||
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ErrorCode_descriptor();
|
||||
|
@ -125,7 +125,7 @@ message Command {
|
||||
*/
|
||||
message Index {
|
||||
int32 index_type = 1;
|
||||
int64 nlist = 2;
|
||||
int32 nlist = 2;
|
||||
int32 index_file_size = 3;
|
||||
int32 metric_type = 4;
|
||||
}
|
||||
|
@ -42,7 +42,6 @@ GrpcRequestHandler::HasTable(::grpc::ServerContext *context,
|
||||
GrpcRequestHandler::DropTable(::grpc::ServerContext *context,
|
||||
const ::milvus::grpc::TableName *request,
|
||||
::milvus::grpc::Status *response) {
|
||||
|
||||
BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name());
|
||||
GrpcRequestScheduler::ExecTask(task_ptr, response);
|
||||
return ::grpc::Status::OK;
|
||||
|
@ -227,12 +227,32 @@ CreateIndexTask::OnExecute() {
|
||||
return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
|
||||
}
|
||||
|
||||
res = ValidationUtil::ValidateTableIndexType(index_param_.mutable_index()->index_type());
|
||||
if(res != SERVER_SUCCESS) {
|
||||
return SetError(res, "Invalid index type: " + std::to_string(index_param_.mutable_index()->index_type()));
|
||||
}
|
||||
|
||||
res = ValidationUtil::ValidateTableIndexNlist(index_param_.mutable_index()->nlist());
|
||||
if(res != SERVER_SUCCESS) {
|
||||
return SetError(res, "Invalid index nlist: " + std::to_string(index_param_.mutable_index()->nlist()));
|
||||
}
|
||||
|
||||
res = ValidationUtil::ValidateTableIndexMetricType(index_param_.mutable_index()->metric_type());
|
||||
if(res != SERVER_SUCCESS) {
|
||||
return SetError(res, "Invalid index metric type: " + std::to_string(index_param_.mutable_index()->metric_type()));
|
||||
}
|
||||
|
||||
res = ValidationUtil::ValidateTableIndexFileSize(index_param_.mutable_index()->index_file_size());
|
||||
if(res != SERVER_SUCCESS) {
|
||||
return SetError(res, "Invalid index file size: " + std::to_string(index_param_.mutable_index()->index_file_size()));
|
||||
}
|
||||
|
||||
//step 2: check table existence
|
||||
engine::TableIndex index;
|
||||
index.engine_type_ = index_param_.mutable_index()->index_type();
|
||||
index.nlist = index_param_.mutable_index()->nlist();
|
||||
index.index_file_size = index_param_.mutable_index()->index_file_size();
|
||||
index.metric_type = index_param_.mutable_index()->metric_type();
|
||||
index.nlist_ = index_param_.mutable_index()->nlist();
|
||||
index.index_file_size_ = index_param_.mutable_index()->index_file_size();
|
||||
index.metric_type_ = index_param_.mutable_index()->metric_type();
|
||||
stat = DBWrapper::DB()->CreateIndex(table_name_, index);
|
||||
if (!stat.ok()) {
|
||||
return SetError(SERVER_BUILD_INDEX_ERROR, "Engine failed: " + stat.ToString());
|
||||
@ -855,9 +875,9 @@ DescribeIndexTask::OnExecute() {
|
||||
|
||||
index_param_.mutable_table_name()->set_table_name(table_name_);
|
||||
index_param_.mutable_index()->set_index_type(index.engine_type_);
|
||||
index_param_.mutable_index()->set_nlist(index.nlist);
|
||||
index_param_.mutable_index()->set_index_file_size(index.index_file_size);
|
||||
index_param_.mutable_index()->set_metric_type(index.metric_type);
|
||||
index_param_.mutable_index()->set_nlist(index.nlist_);
|
||||
index_param_.mutable_index()->set_index_file_size(index.index_file_size_);
|
||||
index_param_.mutable_index()->set_metric_type(index.metric_type_);
|
||||
|
||||
rc.ElapseFromBegin("totally cost");
|
||||
} catch (std::exception &ex) {
|
||||
|
@ -51,6 +51,9 @@ constexpr ServerError SERVER_ILLEGAL_SEARCH_RESULT = ToGlobalServerErrorCode(110
|
||||
constexpr ServerError SERVER_CACHE_ERROR = ToGlobalServerErrorCode(111);
|
||||
constexpr ServerError SERVER_WRITE_ERROR = ToGlobalServerErrorCode(112);
|
||||
constexpr ServerError SERVER_INVALID_NPROBE = ToGlobalServerErrorCode(113);
|
||||
constexpr ServerError SERVER_INVALID_INDEX_NLIST = ToGlobalServerErrorCode(114);
|
||||
constexpr ServerError SERVER_INVALID_INDEX_METRIC_TYPE = ToGlobalServerErrorCode(115);
|
||||
constexpr ServerError SERVER_INVALID_INDEX_FILE_SIZE = ToGlobalServerErrorCode(116);
|
||||
|
||||
|
||||
constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(500);
|
||||
|
@ -10,6 +10,7 @@ namespace server {
|
||||
|
||||
constexpr size_t table_name_size_limit = 255;
|
||||
constexpr int64_t table_dimension_limit = 16384;
|
||||
constexpr int32_t index_file_size_limit = 4096; //index trigger size max = 4096 MB
|
||||
|
||||
ServerError
|
||||
ValidationUtil::ValidateTableName(const std::string &table_name) {
|
||||
@ -65,6 +66,32 @@ ValidationUtil::ValidateTableIndexType(int32_t index_type) {
|
||||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
ServerError
|
||||
ValidationUtil::ValidateTableIndexNlist(int32_t nlist) {
|
||||
if(nlist <= 0) {
|
||||
return SERVER_INVALID_INDEX_NLIST;
|
||||
}
|
||||
|
||||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
ServerError
|
||||
ValidationUtil::ValidateTableIndexFileSize(int32_t index_file_size) {
|
||||
if(index_file_size <= 0 || index_file_size > index_file_size_limit) {
|
||||
return SERVER_INVALID_INDEX_FILE_SIZE;
|
||||
}
|
||||
|
||||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
ServerError
|
||||
ValidationUtil::ValidateTableIndexMetricType(int32_t metric_type) {
|
||||
if(metric_type != (int32_t)engine::MetricType::L2 && metric_type != (int32_t)engine::MetricType::IP) {
|
||||
return SERVER_INVALID_INDEX_METRIC_TYPE;
|
||||
}
|
||||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
ServerError
|
||||
ValidationUtil::ValidateGpuIndex(uint32_t gpu_index) {
|
||||
int num_devices = 0;
|
||||
|
@ -17,6 +17,15 @@ public:
|
||||
static ServerError
|
||||
ValidateTableIndexType(int32_t index_type);
|
||||
|
||||
static ServerError
|
||||
ValidateTableIndexNlist(int32_t nlist);
|
||||
|
||||
static ServerError
|
||||
ValidateTableIndexFileSize(int32_t index_file_size);
|
||||
|
||||
static ServerError
|
||||
ValidateTableIndexMetricType(int32_t metric_type);
|
||||
|
||||
static ServerError
|
||||
ValidateGpuIndex(uint32_t gpu_index);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user