mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-05 05:18:52 +08:00
Merge branch 'branch-0.4.0' into 'branch-0.4.0'
MS-430 Search no result if index created with FLAT See merge request megasearch/milvus!448 Former-commit-id: def2ba5fa3f138deba6e80cb160019ce42630291
This commit is contained in:
commit
df787e1d40
@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-432 - Search vectors params nprobe need to check max number
|
||||
- MS-431 - Search vectors params nprobe: 0/-1, expected result: raise exception
|
||||
- MS-331 - Crate Table : when table exists, error code is META_FAILED(code=15) rather than ILLEGAL TABLE NAME(code=9))
|
||||
- MS-430 - Search no result if index created with FLAT
|
||||
|
||||
## Improvement
|
||||
- MS-327 - Clean code for milvus
|
||||
|
@ -98,7 +98,8 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
|
||||
meta::DatesT dates;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
|
||||
std::vector<size_t> ids;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
@ -173,7 +174,8 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint6
|
||||
|
||||
//get all table files from table
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
|
||||
std::vector<size_t> ids;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
|
||||
if (!status.ok()) { return status; }
|
||||
|
||||
meta::TableFilesSchema file_id_array;
|
||||
@ -334,14 +336,8 @@ void DBImpl::StartMetricTask() {
|
||||
ENGINE_LOG_TRACE << "Metric task finished";
|
||||
}
|
||||
|
||||
void DBImpl::StartCompactionTask() {
|
||||
static uint64_t compact_clock_tick = 0;
|
||||
compact_clock_tick++;
|
||||
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
//serialize memory data
|
||||
Status DBImpl::MemSerialize() {
|
||||
std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
|
||||
std::set<std::string> temp_table_ids;
|
||||
mem_mgr_->Serialize(temp_table_ids);
|
||||
for(auto& id : temp_table_ids) {
|
||||
@ -352,6 +348,19 @@ void DBImpl::StartCompactionTask() {
|
||||
SERVER_LOG_DEBUG << "Insert cache serialized";
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void DBImpl::StartCompactionTask() {
|
||||
static uint64_t compact_clock_tick = 0;
|
||||
compact_clock_tick++;
|
||||
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
//serialize memory data
|
||||
MemSerialize();
|
||||
|
||||
//compactiong has been finished?
|
||||
if(!compact_thread_results_.empty()) {
|
||||
std::chrono::milliseconds span(10);
|
||||
@ -535,39 +544,49 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
|
||||
return status;
|
||||
}
|
||||
|
||||
if(utils::IsSameIndex(old_index, index)) {
|
||||
ENGINE_LOG_DEBUG << "Same index setting, no need to create index again";
|
||||
return Status::OK();
|
||||
}
|
||||
//step 2: update index info
|
||||
if(!utils::IsSameIndex(old_index, index)) {
|
||||
DropIndex(table_id);
|
||||
|
||||
//step 2: drop old index files
|
||||
DropIndex(table_id);
|
||||
|
||||
//step 3: update index info
|
||||
status = meta_ptr_->UpdateTableIndexParam(table_id, index);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Failed to update table index info";
|
||||
return status;
|
||||
}
|
||||
|
||||
if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
|
||||
ENGINE_LOG_DEBUG << "index type = IDMAP, no need to build index";
|
||||
return Status::OK();
|
||||
status = meta_ptr_->UpdateTableIndexParam(table_id, index);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Failed to update table index info";
|
||||
return status;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool has = false;
|
||||
auto status = meta_ptr_->HasNonIndexFiles(table_id, has);
|
||||
//step 3: wait and build index
|
||||
//for IDMAP type, only wait all NEW file converted to RAW file
|
||||
//for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
|
||||
std::vector<int> file_types;
|
||||
if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
|
||||
file_types = {
|
||||
(int) meta::TableFileSchema::NEW,
|
||||
(int) meta::TableFileSchema::NEW_MERGE,
|
||||
};
|
||||
} else {
|
||||
file_types = {
|
||||
(int) meta::TableFileSchema::RAW,
|
||||
(int) meta::TableFileSchema::NEW,
|
||||
(int) meta::TableFileSchema::NEW_MERGE,
|
||||
(int) meta::TableFileSchema::NEW_INDEX,
|
||||
(int) meta::TableFileSchema::TO_INDEX,
|
||||
};
|
||||
}
|
||||
|
||||
std::vector<std::string> file_ids;
|
||||
auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
|
||||
int times = 1;
|
||||
|
||||
while (has) {
|
||||
while (!file_ids.empty()) {
|
||||
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
|
||||
status = meta_ptr_->UpdateTableFilesToIndex(table_id);
|
||||
/* StartBuildIndexTask(true); */
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
|
||||
status = meta_ptr_->HasNonIndexFiles(table_id, has);
|
||||
status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
|
||||
times++;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -576,6 +595,7 @@ Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
|
||||
}
|
||||
|
||||
Status DBImpl::DropIndex(const std::string& table_id) {
|
||||
ENGINE_LOG_DEBUG << "drop index for table: " << table_id;
|
||||
return meta_ptr_->DropTableIndex(table_id);
|
||||
}
|
||||
|
||||
|
@ -117,8 +117,9 @@ class DBImpl : public DB {
|
||||
void StartBuildIndexTask(bool force=false);
|
||||
void BackgroundBuildIndex();
|
||||
|
||||
Status
|
||||
BuildIndex(const meta::TableFileSchema &);
|
||||
Status BuildIndex(const meta::TableFileSchema &);
|
||||
|
||||
Status MemSerialize();
|
||||
|
||||
private:
|
||||
const Options options_;
|
||||
@ -129,6 +130,7 @@ class DBImpl : public DB {
|
||||
|
||||
MetaPtr meta_ptr_;
|
||||
MemManagerAbstractPtr mem_mgr_;
|
||||
std::mutex mem_serialize_mutex_;
|
||||
|
||||
server::ThreadPool compact_thread_pool_;
|
||||
std::list<std::future<void>> compact_thread_results_;
|
||||
|
@ -70,10 +70,10 @@ class Meta {
|
||||
UpdateTableFiles(TableFilesSchema &files) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToSearch(const std::string &table_id, const std::vector<size_t> &ids, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0;
|
||||
FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0;
|
||||
@ -88,7 +88,9 @@ class Meta {
|
||||
FilesToIndex(TableFilesSchema &) = 0;
|
||||
|
||||
virtual Status
|
||||
HasNonIndexFiles(const std::string &table_id, bool &has) = 0;
|
||||
FilesByType(const std::string &table_id,
|
||||
const std::vector<int> &file_types,
|
||||
std::vector<std::string>& file_ids) = 0;
|
||||
|
||||
virtual Status
|
||||
DescribeTableIndex(const std::string &table_id, TableIndex& index) = 0;
|
||||
|
@ -326,10 +326,16 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
|
||||
}
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
|
||||
has = false;
|
||||
Status MySQLMetaImpl::FilesByType(const std::string &table_id,
|
||||
const std::vector<int> &file_types,
|
||||
std::vector<std::string> &file_ids) {
|
||||
if(file_types.empty()) {
|
||||
return Status::Error("file types array is empty");
|
||||
}
|
||||
|
||||
try {
|
||||
file_ids.clear();
|
||||
|
||||
StoreQueryResult res;
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
@ -338,34 +344,75 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
std::string types;
|
||||
for(auto type : file_types) {
|
||||
if(!types.empty()) {
|
||||
types += ",";
|
||||
}
|
||||
types += std::to_string(type);
|
||||
}
|
||||
|
||||
Query hasNonIndexFilesQuery = connectionPtr->query();
|
||||
//since table_id is a unique column we just need to check whether it exists or not
|
||||
hasNonIndexFilesQuery << "SELECT EXISTS " <<
|
||||
"(SELECT 1 FROM TableFiles " <<
|
||||
hasNonIndexFilesQuery << "SELECT file_id, file_type FROM TableFiles " <<
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW_MERGE) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW_INDEX) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ")) " <<
|
||||
"AS " << quote << "check" << ";";
|
||||
"file_type in (" << types << ");";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::HasNonIndexFiles: " << hasNonIndexFilesQuery.str();
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesByType: " << hasNonIndexFilesQuery.str();
|
||||
|
||||
res = hasNonIndexFilesQuery.store();
|
||||
} //Scoped Connection
|
||||
|
||||
int check = res[0]["check"];
|
||||
has = (check == 1);
|
||||
if (res.num_rows() > 0) {
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
||||
int to_index_count = 0, index_count = 0, backup_count = 0;
|
||||
for (auto &resRow : res) {
|
||||
std::string file_id;
|
||||
resRow["file_id"].to_string(file_id);
|
||||
file_ids.push_back(file_id);
|
||||
|
||||
int32_t file_type = resRow["file_type"];
|
||||
switch (file_type) {
|
||||
case (int) TableFileSchema::RAW:
|
||||
raw_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW:
|
||||
new_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW_MERGE:
|
||||
new_merge_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW_INDEX:
|
||||
new_index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::TO_INDEX:
|
||||
to_index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::INDEX:
|
||||
index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::BACKUP:
|
||||
backup_count++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
|
||||
<< " new files:" << new_count << " new_merge files:" << new_merge_count
|
||||
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
|
||||
<< " index files:" << index_count << " backup files:" << backup_count;
|
||||
}
|
||||
|
||||
} catch (const BadQuery &er) {
|
||||
// Handle any query errors
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN CHECKING IF NON INDEX FILES EXISTS" << ": " << er.what();
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN CHECKING IF NON INDEX FILES EXISTS", er.what());
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN GET FILE BY TYPE" << ": " << er.what();
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN GET FILE BY TYPE", er.what());
|
||||
} catch (const Exception &er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN CHECKING IF NON INDEX FILES EXISTS" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN CHECKING IF NON INDEX FILES EXISTS", er.what());
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN GET FILE BY TYPE" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN GET FILE BY TYPE", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -987,121 +1034,6 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
StoreQueryResult res;
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
if (partition.empty()) {
|
||||
|
||||
Query filesToSearchQuery = connectionPtr->query();
|
||||
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
|
||||
"FROM TableFiles " <<
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
|
||||
|
||||
res = filesToSearchQuery.store();
|
||||
|
||||
} else {
|
||||
|
||||
Query filesToSearchQuery = connectionPtr->query();
|
||||
|
||||
std::stringstream partitionListSS;
|
||||
for (auto &date : partition) {
|
||||
partitionListSS << std::to_string(date) << ", ";
|
||||
}
|
||||
std::string partitionListStr = partitionListSS.str();
|
||||
partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
|
||||
|
||||
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, file_size, row_count, date " <<
|
||||
"FROM TableFiles " <<
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"date IN (" << partitionListStr << ") AND " <<
|
||||
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
|
||||
|
||||
res = filesToSearchQuery.store();
|
||||
|
||||
}
|
||||
} //Scoped Connection
|
||||
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
auto status = DescribeTable(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
TableFileSchema table_file;
|
||||
for (auto &resRow : res) {
|
||||
|
||||
table_file.id_ = resRow["id"]; //implicit conversion
|
||||
|
||||
std::string table_id_str;
|
||||
resRow["table_id"].to_string(table_id_str);
|
||||
table_file.table_id_ = table_id_str;
|
||||
|
||||
table_file.index_file_size_ = table_schema.index_file_size_;
|
||||
|
||||
table_file.engine_type_ = resRow["engine_type"];
|
||||
|
||||
table_file.nlist_ = table_schema.nlist_;
|
||||
|
||||
table_file.metric_type_ = table_schema.metric_type_;
|
||||
|
||||
std::string file_id;
|
||||
resRow["file_id"].to_string(file_id);
|
||||
table_file.file_id_ = file_id;
|
||||
|
||||
table_file.file_type_ = resRow["file_type"];
|
||||
|
||||
table_file.file_size_ = resRow["file_size"];
|
||||
|
||||
table_file.row_count_ = resRow["row_count"];
|
||||
|
||||
table_file.date_ = resRow["date"];
|
||||
|
||||
table_file.dimension_ = table_schema.dimension_;
|
||||
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
|
||||
auto dateItr = files.find(table_file.date_);
|
||||
if (dateItr == files.end()) {
|
||||
files[table_file.date_] = TableFilesSchema();
|
||||
}
|
||||
|
||||
files[table_file.date_].push_back(table_file);
|
||||
}
|
||||
} catch (const BadQuery &er) {
|
||||
// Handle any query errors
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
|
||||
} catch (const Exception &er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
|
@ -46,7 +46,9 @@ class MySQLMetaImpl : public Meta {
|
||||
const std::vector<size_t> &ids,
|
||||
TableFilesSchema &table_files) override;
|
||||
|
||||
Status HasNonIndexFiles(const std::string &table_id, bool &has) override;
|
||||
Status FilesByType(const std::string &table_id,
|
||||
const std::vector<int> &file_types,
|
||||
std::vector<std::string> &file_ids) override;
|
||||
|
||||
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
|
||||
|
||||
@ -62,10 +64,6 @@ class MySQLMetaImpl : public Meta {
|
||||
|
||||
Status UpdateTableFiles(TableFilesSchema &files) override;
|
||||
|
||||
Status FilesToSearch(const std::string &table_id,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) override;
|
||||
|
||||
Status FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
|
@ -279,28 +279,26 @@ Status SqliteMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
|
||||
has = false;
|
||||
Status SqliteMetaImpl::FilesByType(const std::string& table_id,
|
||||
const std::vector<int>& file_types,
|
||||
std::vector<std::string>& file_ids) {
|
||||
if(file_types.empty()) {
|
||||
return Status::Error("file types array is empty");
|
||||
}
|
||||
|
||||
try {
|
||||
std::vector<int> file_types = {
|
||||
(int) TableFileSchema::RAW,
|
||||
(int) TableFileSchema::NEW,
|
||||
(int) TableFileSchema::NEW_MERGE,
|
||||
(int) TableFileSchema::NEW_INDEX,
|
||||
(int) TableFileSchema::TO_INDEX,
|
||||
};
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
file_ids.clear();
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_),
|
||||
where(in(&TableFileSchema::file_type_, file_types)
|
||||
and c(&TableFileSchema::table_id_) == table_id
|
||||
));
|
||||
|
||||
if (selected.size() >= 1) {
|
||||
has = true;
|
||||
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
|
||||
std::vector<std::string> file_ids;
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
||||
int to_index_count = 0, index_count = 0, backup_count = 0;
|
||||
for (auto &file : selected) {
|
||||
file_ids.push_back(std::get<0>(file));
|
||||
switch (std::get<1>(file)) {
|
||||
case (int) TableFileSchema::RAW:
|
||||
raw_count++;
|
||||
@ -317,14 +315,21 @@ Status SqliteMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has)
|
||||
case (int) TableFileSchema::TO_INDEX:
|
||||
to_index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::INDEX:
|
||||
index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::BACKUP:
|
||||
backup_count++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
|
||||
<< " new files:" << new_count << " new_merge files:" << new_merge_count
|
||||
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count;
|
||||
<< " new files:" << new_count << " new_merge files:" << new_merge_count
|
||||
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
|
||||
<< " index files:" << index_count << " backup files:" << backup_count;
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
@ -633,111 +638,6 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
if (partition.empty()) {
|
||||
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_,
|
||||
&TableFileSchema::file_size_,
|
||||
&TableFileSchema::row_count_,
|
||||
&TableFileSchema::date_,
|
||||
&TableFileSchema::engine_type_),
|
||||
where(c(&TableFileSchema::table_id_) == table_id and
|
||||
in(&TableFileSchema::file_type_, file_type)));
|
||||
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
auto status = DescribeTable(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
TableFileSchema table_file;
|
||||
|
||||
for (auto &file : selected) {
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.file_type_ = std::get<3>(file);
|
||||
table_file.file_size_ = std::get<4>(file);
|
||||
table_file.row_count_ = std::get<5>(file);
|
||||
table_file.date_ = std::get<6>(file);
|
||||
table_file.engine_type_ = std::get<7>(file);
|
||||
table_file.dimension_ = table_schema.dimension_;
|
||||
table_file.index_file_size_ = table_schema.index_file_size_;
|
||||
table_file.nlist_ = table_schema.nlist_;
|
||||
table_file.metric_type_ = table_schema.metric_type_;
|
||||
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
auto dateItr = files.find(table_file.date_);
|
||||
if (dateItr == files.end()) {
|
||||
files[table_file.date_] = TableFilesSchema();
|
||||
}
|
||||
files[table_file.date_].push_back(table_file);
|
||||
}
|
||||
}
|
||||
else {
|
||||
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_,
|
||||
&TableFileSchema::file_size_,
|
||||
&TableFileSchema::row_count_,
|
||||
&TableFileSchema::date_,
|
||||
&TableFileSchema::engine_type_),
|
||||
where(c(&TableFileSchema::table_id_) == table_id and
|
||||
in(&TableFileSchema::date_, partition) and
|
||||
in(&TableFileSchema::file_type_, file_type)));
|
||||
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
auto status = DescribeTable(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
TableFileSchema table_file;
|
||||
|
||||
for (auto &file : selected) {
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.file_type_ = std::get<3>(file);
|
||||
table_file.file_size_ = std::get<4>(file);
|
||||
table_file.row_count_ = std::get<5>(file);
|
||||
table_file.date_ = std::get<6>(file);
|
||||
table_file.engine_type_ = std::get<7>(file);
|
||||
table_file.dimension_ = table_schema.dimension_;
|
||||
table_file.index_file_size_ = table_schema.index_file_size_;
|
||||
table_file.nlist_ = table_schema.nlist_;
|
||||
table_file.metric_type_ = table_schema.metric_type_;
|
||||
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
auto dateItr = files.find(table_file.date_);
|
||||
if (dateItr == files.end()) {
|
||||
files[table_file.date_] = TableFilesSchema();
|
||||
}
|
||||
files[table_file.date_].push_back(table_file);
|
||||
}
|
||||
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when iterate index files", e);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
|
@ -41,7 +41,9 @@ class SqliteMetaImpl : public Meta {
|
||||
const std::vector<size_t> &ids,
|
||||
TableFilesSchema &table_files) override;
|
||||
|
||||
Status HasNonIndexFiles(const std::string &table_id, bool &has) override;
|
||||
Status FilesByType(const std::string &table_id,
|
||||
const std::vector<int> &file_types,
|
||||
std::vector<std::string> &file_ids) override;
|
||||
|
||||
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
|
||||
|
||||
@ -57,10 +59,6 @@ class SqliteMetaImpl : public Meta {
|
||||
|
||||
Status UpdateTableFiles(TableFilesSchema &files) override;
|
||||
|
||||
Status FilesToSearch(const std::string &table_id,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) override;
|
||||
|
||||
Status FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
|
@ -201,6 +201,7 @@ DescribeTableTask::OnExecute() {
|
||||
|
||||
schema_->mutable_table_name()->set_table_name(table_info.table_id_);
|
||||
schema_->set_dimension(table_info.dimension_);
|
||||
schema_->set_index_file_size(table_info.index_file_size_);
|
||||
|
||||
} catch (std::exception &ex) {
|
||||
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
|
||||
|
@ -307,8 +307,6 @@ TEST_F(DBTest, PRELOADTABLE_TEST) {
|
||||
ASSERT_EQ(target_ids.size(), nb);
|
||||
}
|
||||
|
||||
sleep(2);
|
||||
|
||||
engine::TableIndex index;
|
||||
index.engine_type_ = (int)engine::EngineType::FAISS_IDMAP;
|
||||
db_->CreateIndex(TABLE_NAME, index); // wait until build index finish
|
||||
|
@ -260,17 +260,17 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
|
||||
ASSERT_EQ(files.size(), to_index_files_cnt);
|
||||
|
||||
meta::DatesT dates = {table_file.date_};
|
||||
status = impl_->FilesToSearch(table_id, dates, dated_files);
|
||||
std::vector<size_t> ids;
|
||||
status = impl_->FilesToSearch(table_id, ids, dates, dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
status = impl_->FilesToSearch(table_id, meta::DatesT(), dated_files);
|
||||
status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
std::vector<size_t> ids;
|
||||
status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
|
@ -197,9 +197,12 @@ TEST_F(DISABLED_MySQLTest, ARCHIVE_TEST_DAYS) {
|
||||
i++;
|
||||
}
|
||||
|
||||
bool has;
|
||||
status = impl.HasNonIndexFiles(table_id, has);
|
||||
ASSERT_TRUE(status.ok());
|
||||
std::vector<int> file_types = {
|
||||
(int) meta::TableFileSchema::NEW,
|
||||
};
|
||||
std::vector<std::string> file_ids;
|
||||
status = impl.FilesByType(table_id, file_types, file_ids);
|
||||
ASSERT_FALSE(file_ids.empty());
|
||||
|
||||
status = impl.UpdateTableFilesToIndex(table_id);
|
||||
ASSERT_TRUE(status.ok());
|
||||
@ -332,17 +335,17 @@ TEST_F(DISABLED_MySQLTest, TABLE_FILES_TEST) {
|
||||
ASSERT_EQ(files.size(), to_index_files_cnt);
|
||||
|
||||
meta::DatesT dates = {table_file.date_};
|
||||
status = impl.FilesToSearch(table_id, dates, dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
status = impl.FilesToSearch(table_id, meta::DatesT(), dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
std::vector<size_t> ids;
|
||||
status = impl.FilesToSearch(table_id, ids, dates, dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
status = impl.FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
status = impl.FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
|
Loading…
Reference in New Issue
Block a user