mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 20:39:36 +08:00
Merge pull request #474 from yhmo/0.6.0
#470 raw files should not be build index
This commit is contained in:
commit
85aca5faf4
@ -44,6 +44,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- \#409 - Add a Fallback pass in optimizer
|
||||
- \#433 - C++ SDK query result is not easy to use
|
||||
- \#449 - Add ShowPartitions example for C++ SDK
|
||||
- \#470 - Small raw files should not be build index
|
||||
|
||||
## Task
|
||||
|
||||
|
@ -838,6 +838,25 @@ DBImpl::BackgroundBuildIndex() {
|
||||
// ENGINE_LOG_TRACE << "Background build index thread exit";
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
|
||||
meta::TableFilesSchema& files) {
|
||||
files.clear();
|
||||
auto status = meta_ptr_->FilesByType(table_id, file_types, files);
|
||||
|
||||
// only build index for files that row count greater than certain threshold
|
||||
for (auto it = files.begin(); it != files.end();) {
|
||||
if ((*it).file_type_ == static_cast<int>(meta::TableFileSchema::RAW) &&
|
||||
(*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
|
||||
it = files.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
|
||||
meta::TableFilesSchema& files) {
|
||||
@ -946,18 +965,18 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
|
||||
}
|
||||
|
||||
// get files to build index
|
||||
std::vector<std::string> file_ids;
|
||||
auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
|
||||
meta::TableFilesSchema table_files;
|
||||
auto status = GetFilesToBuildIndex(table_id, file_types, table_files);
|
||||
int times = 1;
|
||||
|
||||
while (!file_ids.empty()) {
|
||||
while (!table_files.empty()) {
|
||||
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
|
||||
if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
|
||||
status = meta_ptr_->UpdateTableFilesToIndex(table_id);
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
|
||||
status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
|
||||
GetFilesToBuildIndex(table_id, file_types, table_files);
|
||||
times++;
|
||||
}
|
||||
|
||||
|
@ -152,6 +152,10 @@ class DBImpl : public DB {
|
||||
Status
|
||||
MemSerialize();
|
||||
|
||||
Status
|
||||
GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>& file_types,
|
||||
meta::TableFilesSchema& files);
|
||||
|
||||
Status
|
||||
GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
|
||||
meta::TableFilesSchema& files);
|
||||
|
@ -109,8 +109,7 @@ class Meta {
|
||||
FilesToIndex(TableFilesSchema&) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||
std::vector<std::string>& file_ids) = 0;
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types, TableFilesSchema& table_files) = 0;
|
||||
|
||||
virtual Status
|
||||
Size(uint64_t& result) = 0;
|
||||
|
@ -32,6 +32,13 @@ const size_t H_SEC = 60 * M_SEC;
|
||||
const size_t D_SEC = 24 * H_SEC;
|
||||
const size_t W_SEC = 7 * D_SEC;
|
||||
|
||||
// This value is to ignore small raw files when building index.
|
||||
// The reason is:
|
||||
// 1. The performance of brute-search for small raw files could be better than small index file.
|
||||
// 2. And small raw files can be merged to larger files, thus reduce fragmented files count.
|
||||
// We decide the value based on a testing for small size raw/index files.
|
||||
const size_t BUILD_INDEX_THRESHOLD = 5000;
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
@ -959,6 +959,7 @@ MySQLMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
|
||||
updateTableFilesToIndexQuery << "UPDATE " << META_TABLEFILES
|
||||
<< " SET file_type = " << std::to_string(TableFileSchema::TO_INDEX)
|
||||
<< " WHERE table_id = " << mysqlpp::quote << table_id
|
||||
<< " AND row_count >= " << std::to_string(meta::BUILD_INDEX_THRESHOLD)
|
||||
<< " AND file_type = " << std::to_string(TableFileSchema::RAW) << ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesToIndex: " << updateTableFilesToIndexQuery.str();
|
||||
@ -1527,13 +1528,13 @@ MySQLMetaImpl::FilesToIndex(TableFilesSchema& files) {
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||
std::vector<std::string>& file_ids) {
|
||||
TableFilesSchema& table_files) {
|
||||
if (file_types.empty()) {
|
||||
return Status(DB_ERROR, "file types array is empty");
|
||||
}
|
||||
|
||||
try {
|
||||
file_ids.clear();
|
||||
table_files.clear();
|
||||
|
||||
mysqlpp::StoreQueryResult res;
|
||||
{
|
||||
@ -1553,9 +1554,10 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
||||
|
||||
mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query();
|
||||
// since table_id is a unique column we just need to check whether it exists or not
|
||||
hasNonIndexFilesQuery << "SELECT file_id, file_type"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
|
||||
<< " AND file_type in (" << types << ");";
|
||||
hasNonIndexFilesQuery
|
||||
<< "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id
|
||||
<< " AND file_type in (" << types << ");";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
|
||||
|
||||
@ -1566,9 +1568,18 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
||||
int to_index_count = 0, index_count = 0, backup_count = 0;
|
||||
for (auto& resRow : res) {
|
||||
std::string file_id;
|
||||
resRow["file_id"].to_string(file_id);
|
||||
file_ids.push_back(file_id);
|
||||
TableFileSchema file_schema;
|
||||
file_schema.id_ = resRow["id"];
|
||||
file_schema.table_id_ = table_id;
|
||||
file_schema.engine_type_ = resRow["engine_type"];
|
||||
resRow["file_id"].to_string(file_schema.file_id_);
|
||||
file_schema.file_type_ = resRow["file_type"];
|
||||
file_schema.file_size_ = resRow["file_size"];
|
||||
file_schema.row_count_ = resRow["row_count"];
|
||||
file_schema.date_ = resRow["date"];
|
||||
file_schema.created_on_ = resRow["created_on"];
|
||||
|
||||
table_files.emplace_back(file_schema);
|
||||
|
||||
int32_t file_type = resRow["file_type"];
|
||||
switch (file_type) {
|
||||
|
@ -108,7 +108,7 @@ class MySQLMetaImpl : public Meta {
|
||||
|
||||
Status
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||
std::vector<std::string>& file_ids) override;
|
||||
TableFilesSchema& table_files) override;
|
||||
|
||||
Status
|
||||
Archive() override;
|
||||
|
@ -58,7 +58,7 @@ HandleException(const std::string& desc, const char* what = nullptr) {
|
||||
} // namespace
|
||||
|
||||
inline auto
|
||||
StoragePrototype(const std::string &path) {
|
||||
StoragePrototype(const std::string& path) {
|
||||
return make_storage(path,
|
||||
make_table(META_TABLES,
|
||||
make_column("id", &TableSchema::id_, primary_key()),
|
||||
@ -160,7 +160,7 @@ SqliteMetaImpl::Initialize() {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
SqliteMetaImpl::CreateTable(TableSchema& table_schema) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -188,20 +188,20 @@ SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
try {
|
||||
auto id = ConnectorPtr->insert(table_schema);
|
||||
table_schema.id_ = id;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when create table", e.what());
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
|
||||
|
||||
return utils::CreateTablePath(options_, table_schema.table_id_);
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when create table", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
SqliteMetaImpl::DescribeTable(TableSchema& table_schema) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -218,7 +218,7 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
&TableSchema::partition_tag_,
|
||||
&TableSchema::version_),
|
||||
where(c(&TableSchema::table_id_) == table_schema.table_id_
|
||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
|
||||
if (groups.size() == 1) {
|
||||
table_schema.id_ = std::get<0>(groups[0]);
|
||||
@ -236,7 +236,7 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
} else {
|
||||
return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when describe table", e.what());
|
||||
}
|
||||
|
||||
@ -244,20 +244,20 @@ SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
||||
SqliteMetaImpl::HasTable(const std::string& table_id, bool& has_or_not) {
|
||||
has_or_not = false;
|
||||
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
|
||||
where(c(&TableSchema::table_id_) == table_id
|
||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
if (tables.size() == 1) {
|
||||
has_or_not = true;
|
||||
} else {
|
||||
has_or_not = false;
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when lookup table", e.what());
|
||||
}
|
||||
|
||||
@ -265,7 +265,7 @@ SqliteMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||
SqliteMetaImpl::AllTables(std::vector<TableSchema>& table_schema_array) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -281,8 +281,8 @@ SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||
&TableSchema::owner_table_,
|
||||
&TableSchema::partition_tag_,
|
||||
&TableSchema::version_),
|
||||
where(c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
for (auto &table : selected) {
|
||||
where(c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
for (auto& table : selected) {
|
||||
TableSchema schema;
|
||||
schema.id_ = std::get<0>(table);
|
||||
schema.table_id_ = std::get<1>(table);
|
||||
@ -299,7 +299,7 @@ SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||
|
||||
table_schema_array.emplace_back(schema);
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when lookup all tables", e.what());
|
||||
}
|
||||
|
||||
@ -307,7 +307,7 @@ SqliteMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::DropTable(const std::string &table_id) {
|
||||
SqliteMetaImpl::DropTable(const std::string& table_id) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -317,13 +317,13 @@ SqliteMetaImpl::DropTable(const std::string &table_id) {
|
||||
//soft delete table
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableSchema::state_) = (int) TableSchema::TO_DELETE),
|
||||
c(&TableSchema::state_) = (int)TableSchema::TO_DELETE),
|
||||
where(
|
||||
c(&TableSchema::table_id_) == table_id and
|
||||
c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
|
||||
ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when delete table", e.what());
|
||||
}
|
||||
|
||||
@ -331,7 +331,7 @@ SqliteMetaImpl::DropTable(const std::string &table_id) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
||||
SqliteMetaImpl::DeleteTableFiles(const std::string& table_id) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -341,14 +341,14 @@ SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
||||
//soft delete table files
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
|
||||
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
|
||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||
where(
|
||||
c(&TableFileSchema::table_id_) == table_id and
|
||||
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
|
||||
c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
|
||||
|
||||
ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when delete table files", e.what());
|
||||
}
|
||||
|
||||
@ -356,7 +356,7 @@ SqliteMetaImpl::DeleteTableFiles(const std::string &table_id) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
SqliteMetaImpl::CreateTableFile(TableFileSchema& file_schema) {
|
||||
if (file_schema.date_ == EmptyDate) {
|
||||
file_schema.date_ = utils::GetDate();
|
||||
}
|
||||
@ -389,7 +389,7 @@ SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
|
||||
ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
|
||||
return utils::CreateTableFilePath(options_, file_schema);
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when create table file", e.what());
|
||||
}
|
||||
|
||||
@ -398,8 +398,8 @@ SqliteMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
|
||||
// TODO(myh): Delete single vecotor by id
|
||||
Status
|
||||
SqliteMetaImpl::DropDataByDate(const std::string &table_id,
|
||||
const DatesT &dates) {
|
||||
SqliteMetaImpl::DropDataByDate(const std::string& table_id,
|
||||
const DatesT& dates) {
|
||||
if (dates.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
@ -440,7 +440,7 @@ SqliteMetaImpl::DropDataByDate(const std::string &table_id,
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Successfully drop data by date, table id = " << table_schema.table_id_;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when drop partition", e.what());
|
||||
}
|
||||
|
||||
@ -448,9 +448,9 @@ SqliteMetaImpl::DropDataByDate(const std::string &table_id,
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::GetTableFiles(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
TableFilesSchema &table_files) {
|
||||
SqliteMetaImpl::GetTableFiles(const std::string& table_id,
|
||||
const std::vector<size_t>& ids,
|
||||
TableFilesSchema& table_files) {
|
||||
try {
|
||||
table_files.clear();
|
||||
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
@ -463,7 +463,7 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id,
|
||||
&TableFileSchema::created_on_),
|
||||
where(c(&TableFileSchema::table_id_) == table_id and
|
||||
in(&TableFileSchema::id_, ids) and
|
||||
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
|
||||
c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
auto status = DescribeTable(table_schema);
|
||||
@ -472,7 +472,7 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id,
|
||||
}
|
||||
|
||||
Status result;
|
||||
for (auto &file : files) {
|
||||
for (auto& file : files) {
|
||||
TableFileSchema file_schema;
|
||||
file_schema.table_id_ = table_id;
|
||||
file_schema.id_ = std::get<0>(file);
|
||||
@ -495,13 +495,13 @@ SqliteMetaImpl::GetTableFiles(const std::string &table_id,
|
||||
|
||||
ENGINE_LOG_DEBUG << "Get table files by id";
|
||||
return result;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when lookup table files", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
|
||||
SqliteMetaImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -512,7 +512,7 @@ SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
|
||||
where(
|
||||
c(&TableSchema::table_id_) == table_id));
|
||||
ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
std::string msg = "Encounter exception when update table flag: table_id = " + table_id;
|
||||
return HandleException(msg, e.what());
|
||||
}
|
||||
@ -521,7 +521,7 @@ SqliteMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||
SqliteMetaImpl::UpdateTableFile(TableFileSchema& file_schema) {
|
||||
file_schema.updated_time_ = utils::GetMicroSecTimeStamp();
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
@ -534,14 +534,14 @@ SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||
|
||||
//if the table has been deleted, just mark the table file as TO_DELETE
|
||||
//clean thread will delete the file later
|
||||
if (tables.size() < 1 || std::get<0>(tables[0]) == (int) TableSchema::TO_DELETE) {
|
||||
if (tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
|
||||
file_schema.file_type_ = TableFileSchema::TO_DELETE;
|
||||
}
|
||||
|
||||
ConnectorPtr->update(file_schema);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
|
||||
+ " file_id = " + file_schema.file_id_;
|
||||
return HandleException(msg, e.what());
|
||||
@ -550,7 +550,7 @@ SqliteMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
SqliteMetaImpl::UpdateTableFiles(TableFilesSchema& files) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -558,13 +558,13 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
std::map<std::string, bool> has_tables;
|
||||
for (auto &file : files) {
|
||||
for (auto& file : files) {
|
||||
if (has_tables.find(file.table_id_) != has_tables.end()) {
|
||||
continue;
|
||||
}
|
||||
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
|
||||
where(c(&TableSchema::table_id_) == file.table_id_
|
||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
if (tables.size() >= 1) {
|
||||
has_tables[file.table_id_] = true;
|
||||
} else {
|
||||
@ -573,7 +573,7 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
}
|
||||
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
for (auto &file : files) {
|
||||
for (auto& file : files) {
|
||||
if (!has_tables[file.table_id_]) {
|
||||
file.file_type_ = TableFileSchema::TO_DELETE;
|
||||
}
|
||||
@ -589,7 +589,7 @@ SqliteMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when update table files", e.what());
|
||||
}
|
||||
return Status::OK();
|
||||
@ -613,7 +613,7 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex&
|
||||
&TableSchema::partition_tag_,
|
||||
&TableSchema::version_),
|
||||
where(c(&TableSchema::table_id_) == table_id
|
||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
|
||||
if (tables.size() > 0) {
|
||||
meta::TableSchema table_schema;
|
||||
@ -639,11 +639,11 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex&
|
||||
//set all backup file to raw
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
|
||||
c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
|
||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||
where(
|
||||
c(&TableFileSchema::table_id_) == table_id and
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP));
|
||||
c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
|
||||
|
||||
ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;
|
||||
} catch (std::exception& e) {
|
||||
@ -655,7 +655,7 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex&
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
|
||||
SqliteMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -664,13 +664,14 @@ SqliteMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
|
||||
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX),
|
||||
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_INDEX),
|
||||
where(
|
||||
c(&TableFileSchema::table_id_) == table_id and
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW));
|
||||
c(&TableFileSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and
|
||||
c(&TableFileSchema::file_type_) == (int)TableFileSchema::RAW));
|
||||
|
||||
ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when update table files to to_index", e.what());
|
||||
}
|
||||
|
||||
@ -686,7 +687,7 @@ SqliteMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& inde
|
||||
&TableSchema::nlist_,
|
||||
&TableSchema::metric_type_),
|
||||
where(c(&TableSchema::table_id_) == table_id
|
||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
|
||||
if (groups.size() == 1) {
|
||||
index.engine_type_ = std::get<0>(groups[0]);
|
||||
@ -713,20 +714,20 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
|
||||
//soft delete index files
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
|
||||
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
|
||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||
where(
|
||||
c(&TableFileSchema::table_id_) == table_id and
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX));
|
||||
c(&TableFileSchema::file_type_) == (int)TableFileSchema::INDEX));
|
||||
|
||||
//set all backup file to raw
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::RAW,
|
||||
c(&TableFileSchema::file_type_) = (int)TableFileSchema::RAW,
|
||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||
where(
|
||||
c(&TableFileSchema::table_id_) == table_id and
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::BACKUP));
|
||||
c(&TableFileSchema::file_type_) == (int)TableFileSchema::BACKUP));
|
||||
|
||||
//set table index type to raw
|
||||
ConnectorPtr->update_all(
|
||||
@ -738,7 +739,7 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
|
||||
c(&TableSchema::table_id_) == table_id));
|
||||
|
||||
ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when delete table index files", e.what());
|
||||
}
|
||||
|
||||
@ -746,7 +747,9 @@ SqliteMetaImpl::DropTableIndex(const std::string& table_id) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& tag) {
|
||||
SqliteMetaImpl::CreatePartition(const std::string& table_id,
|
||||
const std::string& partition_name,
|
||||
const std::string& tag) {
|
||||
server::MetricCollector metric;
|
||||
|
||||
TableSchema table_schema;
|
||||
@ -757,7 +760,7 @@ SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string&
|
||||
}
|
||||
|
||||
// not allow create partition under partition
|
||||
if(!table_schema.owner_table_.empty()) {
|
||||
if (!table_schema.owner_table_.empty()) {
|
||||
return Status(DB_ERROR, "Nested partition is not allowed");
|
||||
}
|
||||
|
||||
@ -769,7 +772,7 @@ SqliteMetaImpl::CreatePartition(const std::string& table_id, const std::string&
|
||||
// not allow duplicated partition
|
||||
std::string exist_partition;
|
||||
GetPartitionName(table_id, valid_tag, exist_partition);
|
||||
if(!exist_partition.empty()) {
|
||||
if (!exist_partition.empty()) {
|
||||
return Status(DB_ERROR, "Duplicate partition is not allowed");
|
||||
}
|
||||
|
||||
@ -805,16 +808,16 @@ SqliteMetaImpl::ShowPartitions(const std::string& table_id, std::vector<meta::Ta
|
||||
server::MetricCollector metric;
|
||||
|
||||
auto partitions = ConnectorPtr->select(columns(&TableSchema::table_id_),
|
||||
where(c(&TableSchema::owner_table_) == table_id
|
||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
for(size_t i = 0; i < partitions.size(); i++) {
|
||||
where(c(&TableSchema::owner_table_) == table_id
|
||||
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
for (size_t i = 0; i < partitions.size(); i++) {
|
||||
std::string partition_name = std::get<0>(partitions[i]);
|
||||
meta::TableSchema partition_schema;
|
||||
partition_schema.table_id_ = partition_name;
|
||||
DescribeTable(partition_schema);
|
||||
partiton_schema_array.emplace_back(partition_schema);
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when show partitions", e.what());
|
||||
}
|
||||
|
||||
@ -832,14 +835,15 @@ SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string&
|
||||
server::StringHelpFunctions::TrimStringBlank(valid_tag);
|
||||
|
||||
auto name = ConnectorPtr->select(columns(&TableSchema::table_id_),
|
||||
where(c(&TableSchema::owner_table_) == table_id
|
||||
and c(&TableSchema::partition_tag_) == valid_tag));
|
||||
where(c(&TableSchema::owner_table_) == table_id
|
||||
and c(&TableSchema::partition_tag_) == valid_tag
|
||||
and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));
|
||||
if (name.size() > 0) {
|
||||
partition_name = std::get<0>(name[0]);
|
||||
} else {
|
||||
return Status(DB_NOT_FOUND, "Table " + table_id + "'s partition " + valid_tag + " not found");
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when get partition name", e.what());
|
||||
}
|
||||
|
||||
@ -1032,7 +1036,7 @@ SqliteMetaImpl::FilesToMerge(const std::string& table_id, DatePartionedTableFile
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
@ -1048,13 +1052,13 @@ SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
&TableFileSchema::engine_type_,
|
||||
&TableFileSchema::created_on_),
|
||||
where(c(&TableFileSchema::file_type_)
|
||||
== (int) TableFileSchema::TO_INDEX));
|
||||
== (int)TableFileSchema::TO_INDEX));
|
||||
|
||||
std::map<std::string, TableSchema> groups;
|
||||
TableFileSchema table_file;
|
||||
|
||||
Status ret;
|
||||
for (auto &file : selected) {
|
||||
for (auto& file : selected) {
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
@ -1090,48 +1094,66 @@ SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " to-index files";
|
||||
}
|
||||
return ret;
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when iterate raw files", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesByType(const std::string &table_id,
|
||||
const std::vector<int> &file_types,
|
||||
std::vector<std::string> &file_ids) {
|
||||
SqliteMetaImpl::FilesByType(const std::string& table_id,
|
||||
const std::vector<int>& file_types,
|
||||
TableFilesSchema& table_files) {
|
||||
if (file_types.empty()) {
|
||||
return Status(DB_ERROR, "file types array is empty");
|
||||
}
|
||||
|
||||
try {
|
||||
file_ids.clear();
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_),
|
||||
table_files.clear();
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_,
|
||||
&TableFileSchema::file_size_,
|
||||
&TableFileSchema::row_count_,
|
||||
&TableFileSchema::date_,
|
||||
&TableFileSchema::engine_type_,
|
||||
&TableFileSchema::created_on_),
|
||||
where(in(&TableFileSchema::file_type_, file_types)
|
||||
and c(&TableFileSchema::table_id_) == table_id));
|
||||
|
||||
if (selected.size() >= 1) {
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
||||
int to_index_count = 0, index_count = 0, backup_count = 0;
|
||||
for (auto &file : selected) {
|
||||
file_ids.push_back(std::get<0>(file));
|
||||
switch (std::get<1>(file)) {
|
||||
case (int) TableFileSchema::RAW:raw_count++;
|
||||
for (auto& file : selected) {
|
||||
TableFileSchema file_schema;
|
||||
file_schema.table_id_ = table_id;
|
||||
file_schema.id_ = std::get<0>(file);
|
||||
file_schema.file_id_ = std::get<1>(file);
|
||||
file_schema.file_type_ = std::get<2>(file);
|
||||
file_schema.file_size_ = std::get<3>(file);
|
||||
file_schema.row_count_ = std::get<4>(file);
|
||||
file_schema.date_ = std::get<5>(file);
|
||||
file_schema.engine_type_ = std::get<6>(file);
|
||||
file_schema.created_on_ = std::get<7>(file);
|
||||
|
||||
switch (file_schema.file_type_) {
|
||||
case (int)TableFileSchema::RAW:raw_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW:new_count++;
|
||||
case (int)TableFileSchema::NEW:new_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW_MERGE:new_merge_count++;
|
||||
case (int)TableFileSchema::NEW_MERGE:new_merge_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW_INDEX:new_index_count++;
|
||||
case (int)TableFileSchema::NEW_INDEX:new_index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::TO_INDEX:to_index_count++;
|
||||
case (int)TableFileSchema::TO_INDEX:to_index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::INDEX:index_count++;
|
||||
case (int)TableFileSchema::INDEX:index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::BACKUP:backup_count++;
|
||||
case (int)TableFileSchema::BACKUP:backup_count++;
|
||||
break;
|
||||
default:break;
|
||||
}
|
||||
|
||||
table_files.emplace_back(file_schema);
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
|
||||
@ -1139,13 +1161,12 @@ SqliteMetaImpl::FilesByType(const std::string &table_id,
|
||||
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
|
||||
<< " index files:" << index_count << " backup files:" << backup_count;
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when check non index files", e.what());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
// TODO(myh): Support swap to cloud storage
|
||||
Status
|
||||
SqliteMetaImpl::Archive() {
|
||||
@ -1166,11 +1187,11 @@ SqliteMetaImpl::Archive() {
|
||||
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE),
|
||||
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE),
|
||||
where(
|
||||
c(&TableFileSchema::created_on_) < (int64_t) (now - usecs) and
|
||||
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE));
|
||||
} catch (std::exception &e) {
|
||||
c(&TableFileSchema::created_on_) < (int64_t)(now - usecs) and
|
||||
c(&TableFileSchema::file_type_) != (int)TableFileSchema::TO_DELETE));
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when update table files", e.what());
|
||||
}
|
||||
|
||||
@ -1218,15 +1239,15 @@ SqliteMetaImpl::CleanUp() {
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
std::vector<int> file_types = {
|
||||
(int) TableFileSchema::NEW,
|
||||
(int) TableFileSchema::NEW_INDEX,
|
||||
(int) TableFileSchema::NEW_MERGE
|
||||
(int)TableFileSchema::NEW,
|
||||
(int)TableFileSchema::NEW_INDEX,
|
||||
(int)TableFileSchema::NEW_MERGE
|
||||
};
|
||||
auto files =
|
||||
ConnectorPtr->select(columns(&TableFileSchema::id_), where(in(&TableFileSchema::file_type_, file_types)));
|
||||
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
for (auto &file : files) {
|
||||
for (auto& file : files) {
|
||||
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
|
||||
ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
|
||||
}
|
||||
@ -1240,7 +1261,7 @@ SqliteMetaImpl::CleanUp() {
|
||||
if (files.size() > 0) {
|
||||
ENGINE_LOG_DEBUG << "Clean " << files.size() << " files";
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when clean table file", e.what());
|
||||
}
|
||||
|
||||
@ -1265,7 +1286,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
&TableFileSchema::date_),
|
||||
where(
|
||||
c(&TableFileSchema::file_type_) ==
|
||||
(int) TableFileSchema::TO_DELETE
|
||||
(int)TableFileSchema::TO_DELETE
|
||||
and
|
||||
c(&TableFileSchema::updated_time_)
|
||||
< now - seconds * US_PS));
|
||||
@ -1354,7 +1375,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::Count(const std::string &table_id, uint64_t &result) {
|
||||
SqliteMetaImpl::Count(const std::string& table_id, uint64_t& result) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
@ -1414,14 +1435,14 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::file_size_),
|
||||
where(c(&TableFileSchema::file_type_)
|
||||
!= (int) TableFileSchema::TO_DELETE),
|
||||
!= (int)TableFileSchema::TO_DELETE),
|
||||
order_by(&TableFileSchema::id_),
|
||||
limit(10));
|
||||
|
||||
std::vector<int> ids;
|
||||
TableFileSchema table_file;
|
||||
|
||||
for (auto &file : selected) {
|
||||
for (auto& file : selected) {
|
||||
if (to_discard_size <= 0) break;
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.file_size_ = std::get<1>(file);
|
||||
@ -1437,7 +1458,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
|
||||
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
|
||||
c(&TableFileSchema::file_type_) = (int)TableFileSchema::TO_DELETE,
|
||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()),
|
||||
where(
|
||||
in(&TableFileSchema::id_, ids)));
|
||||
@ -1448,7 +1469,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
|
||||
if (!commited) {
|
||||
return HandleException("DiscardFiles error: sqlite transaction failed");
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when discard table file", e.what());
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,7 @@ class SqliteMetaImpl : public Meta {
|
||||
|
||||
Status
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||
std::vector<std::string>& file_ids) override;
|
||||
TableFilesSchema& table_files) override;
|
||||
|
||||
Status
|
||||
Size(uint64_t& result) override;
|
||||
|
@ -306,9 +306,9 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(), 0);
|
||||
|
||||
std::vector<int> file_types;
|
||||
std::vector<std::string> file_ids;
|
||||
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
|
||||
ASSERT_TRUE(file_ids.empty());
|
||||
milvus::engine::meta::TableFilesSchema table_files;
|
||||
status = impl_->FilesByType(table.table_id_, file_types, table_files);
|
||||
ASSERT_TRUE(table_files.empty());
|
||||
ASSERT_FALSE(status.ok());
|
||||
|
||||
file_types = {
|
||||
@ -317,11 +317,11 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
|
||||
milvus::engine::meta::TableFileSchema::INDEX, milvus::engine::meta::TableFileSchema::RAW,
|
||||
milvus::engine::meta::TableFileSchema::BACKUP,
|
||||
};
|
||||
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
|
||||
status = impl_->FilesByType(table.table_id_, file_types, table_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt +
|
||||
to_index_files_cnt + index_files_cnt;
|
||||
ASSERT_EQ(file_ids.size(), total_cnt);
|
||||
ASSERT_EQ(table_files.size(), total_cnt);
|
||||
|
||||
status = impl_->DeleteTableFiles(table_id);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
@ -169,9 +169,9 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
|
||||
std::vector<int> file_types = {
|
||||
(int)milvus::engine::meta::TableFileSchema::NEW,
|
||||
};
|
||||
std::vector<std::string> file_ids;
|
||||
status = impl.FilesByType(table_id, file_types, file_ids);
|
||||
ASSERT_FALSE(file_ids.empty());
|
||||
milvus::engine::meta::TableFilesSchema table_files;
|
||||
status = impl.FilesByType(table_id, file_types, table_files);
|
||||
ASSERT_FALSE(table_files.empty());
|
||||
|
||||
status = impl.UpdateTableFilesToIndex(table_id);
|
||||
ASSERT_TRUE(status.ok());
|
||||
@ -326,9 +326,9 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(), 0);
|
||||
|
||||
std::vector<int> file_types;
|
||||
std::vector<std::string> file_ids;
|
||||
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
|
||||
ASSERT_TRUE(file_ids.empty());
|
||||
milvus::engine::meta::TableFilesSchema table_files;
|
||||
status = impl_->FilesByType(table.table_id_, file_types, table_files);
|
||||
ASSERT_TRUE(table_files.empty());
|
||||
ASSERT_FALSE(status.ok());
|
||||
|
||||
file_types = {
|
||||
@ -337,11 +337,11 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
|
||||
milvus::engine::meta::TableFileSchema::INDEX, milvus::engine::meta::TableFileSchema::RAW,
|
||||
milvus::engine::meta::TableFileSchema::BACKUP,
|
||||
};
|
||||
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
|
||||
status = impl_->FilesByType(table.table_id_, file_types, table_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt +
|
||||
to_index_files_cnt + index_files_cnt;
|
||||
ASSERT_EQ(file_ids.size(), total_cnt);
|
||||
ASSERT_EQ(table_files.size(), total_cnt);
|
||||
|
||||
status = impl_->DeleteTableFiles(table_id);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
Loading…
Reference in New Issue
Block a user