mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-05 05:18:52 +08:00
update
Former-commit-id: 1c819b89069c8337b233439f00663bf615533577
This commit is contained in:
commit
e39a522603
@ -253,144 +253,144 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
|
||||
return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
|
||||
}
|
||||
|
||||
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
|
||||
if (!status.ok()) { return status; }
|
||||
|
||||
ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
|
||||
|
||||
meta::TableFilesSchema index_files;
|
||||
meta::TableFilesSchema raw_files;
|
||||
for (auto &day_files : files) {
|
||||
for (auto &file : day_files.second) {
|
||||
file.file_type_ == meta::TableFileSchema::INDEX ?
|
||||
index_files.push_back(file) : raw_files.push_back(file);
|
||||
}
|
||||
}
|
||||
|
||||
int dim = 0;
|
||||
if (!index_files.empty()) {
|
||||
dim = index_files[0].dimension_;
|
||||
} else if (!raw_files.empty()) {
|
||||
dim = raw_files[0].dimension_;
|
||||
} else {
|
||||
ENGINE_LOG_DEBUG << "no files to search";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
{
|
||||
// [{ids, distence}, ...]
|
||||
using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
|
||||
std::vector<SearchResult> batchresult(nq); // allocate nq cells.
|
||||
|
||||
auto cluster = [&](long *nns, float *dis, const int& k) -> void {
|
||||
for (int i = 0; i < nq; ++i) {
|
||||
auto f_begin = batchresult[i].first.cbegin();
|
||||
auto s_begin = batchresult[i].second.cbegin();
|
||||
batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
|
||||
batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
|
||||
}
|
||||
};
|
||||
|
||||
// Allocate Memory
|
||||
float *output_distence;
|
||||
long *output_ids;
|
||||
output_distence = (float *) malloc(k * nq * sizeof(float));
|
||||
output_ids = (long *) malloc(k * nq * sizeof(long));
|
||||
memset(output_distence, 0, k * nq * sizeof(float));
|
||||
memset(output_ids, 0, k * nq * sizeof(long));
|
||||
|
||||
long search_set_size = 0;
|
||||
|
||||
auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
|
||||
for (auto &file : file_vec) {
|
||||
|
||||
ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
|
||||
index->Load();
|
||||
auto file_size = index->PhysicalSize();
|
||||
search_set_size += file_size;
|
||||
|
||||
ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
|
||||
<< file_size/(1024*1024) << " M";
|
||||
|
||||
int inner_k = index->Count() < k ? index->Count() : k;
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
index->Search(nq, vectors, inner_k, output_distence, output_ids);
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
CollectFileMetrics(file.file_type_, file_size, total_time);
|
||||
cluster(output_ids, output_distence, inner_k); // cluster to each query
|
||||
memset(output_distence, 0, k * nq * sizeof(float));
|
||||
memset(output_ids, 0, k * nq * sizeof(long));
|
||||
}
|
||||
};
|
||||
|
||||
auto topk_cpu = [](const std::vector<float> &input_data,
|
||||
const int &k,
|
||||
float *output_distence,
|
||||
long *output_ids) -> void {
|
||||
std::map<float, std::vector<int>> inverted_table;
|
||||
for (int i = 0; i < input_data.size(); ++i) {
|
||||
if (inverted_table.count(input_data[i]) == 1) {
|
||||
auto& ori_vec = inverted_table[input_data[i]];
|
||||
ori_vec.push_back(i);
|
||||
}
|
||||
else {
|
||||
inverted_table[input_data[i]] = std::vector<int>{i};
|
||||
}
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
for (auto &item : inverted_table){
|
||||
if (count == k) break;
|
||||
for (auto &id : item.second){
|
||||
output_distence[count] = item.first;
|
||||
output_ids[count] = id;
|
||||
if (++count == k) break;
|
||||
}
|
||||
}
|
||||
};
|
||||
auto cluster_topk = [&]() -> void {
|
||||
QueryResult res;
|
||||
for (auto &result_pair : batchresult) {
|
||||
auto &dis = result_pair.second;
|
||||
auto &nns = result_pair.first;
|
||||
|
||||
topk_cpu(dis, k, output_distence, output_ids);
|
||||
|
||||
int inner_k = dis.size() < k ? dis.size() : k;
|
||||
for (int i = 0; i < inner_k; ++i) {
|
||||
res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
|
||||
}
|
||||
results.push_back(res); // append to result list
|
||||
res.clear();
|
||||
memset(output_distence, 0, k * nq * sizeof(float));
|
||||
memset(output_ids, 0, k * nq * sizeof(long));
|
||||
}
|
||||
};
|
||||
|
||||
search_in_index(raw_files);
|
||||
search_in_index(index_files);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
|
||||
cluster_topk();
|
||||
|
||||
free(output_distence);
|
||||
free(output_ids);
|
||||
}
|
||||
|
||||
if (results.empty()) {
|
||||
return Status::NotFound("Group " + table_id + ", search result not found!");
|
||||
}
|
||||
|
||||
QueryResults temp_results;
|
||||
CalcScore(nq, vectors, dim, results, temp_results);
|
||||
results.swap(temp_results);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
//Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
// const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
// meta::DatePartionedTableFilesSchema files;
|
||||
// auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
|
||||
// if (!status.ok()) { return status; }
|
||||
//
|
||||
// ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
|
||||
//
|
||||
// meta::TableFilesSchema index_files;
|
||||
// meta::TableFilesSchema raw_files;
|
||||
// for (auto &day_files : files) {
|
||||
// for (auto &file : day_files.second) {
|
||||
// file.file_type_ == meta::TableFileSchema::INDEX ?
|
||||
// index_files.push_back(file) : raw_files.push_back(file);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// int dim = 0;
|
||||
// if (!index_files.empty()) {
|
||||
// dim = index_files[0].dimension_;
|
||||
// } else if (!raw_files.empty()) {
|
||||
// dim = raw_files[0].dimension_;
|
||||
// } else {
|
||||
// ENGINE_LOG_DEBUG << "no files to search";
|
||||
// return Status::OK();
|
||||
// }
|
||||
//
|
||||
// {
|
||||
// // [{ids, distence}, ...]
|
||||
// using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
|
||||
// std::vector<SearchResult> batchresult(nq); // allocate nq cells.
|
||||
//
|
||||
// auto cluster = [&](long *nns, float *dis, const int& k) -> void {
|
||||
// for (int i = 0; i < nq; ++i) {
|
||||
// auto f_begin = batchresult[i].first.cbegin();
|
||||
// auto s_begin = batchresult[i].second.cbegin();
|
||||
// batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
|
||||
// batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// // Allocate Memory
|
||||
// float *output_distence;
|
||||
// long *output_ids;
|
||||
// output_distence = (float *) malloc(k * nq * sizeof(float));
|
||||
// output_ids = (long *) malloc(k * nq * sizeof(long));
|
||||
// memset(output_distence, 0, k * nq * sizeof(float));
|
||||
// memset(output_ids, 0, k * nq * sizeof(long));
|
||||
//
|
||||
// long search_set_size = 0;
|
||||
//
|
||||
// auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
|
||||
// for (auto &file : file_vec) {
|
||||
//
|
||||
// ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
|
||||
// index->Load();
|
||||
// auto file_size = index->PhysicalSize();
|
||||
// search_set_size += file_size;
|
||||
//
|
||||
// ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
|
||||
// << file_size/(1024*1024) << " M";
|
||||
//
|
||||
// int inner_k = index->Count() < k ? index->Count() : k;
|
||||
// auto start_time = METRICS_NOW_TIME;
|
||||
// index->Search(nq, vectors, inner_k, output_distence, output_ids);
|
||||
// auto end_time = METRICS_NOW_TIME;
|
||||
// auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
// CollectFileMetrics(file.file_type_, file_size, total_time);
|
||||
// cluster(output_ids, output_distence, inner_k); // cluster to each query
|
||||
// memset(output_distence, 0, k * nq * sizeof(float));
|
||||
// memset(output_ids, 0, k * nq * sizeof(long));
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// auto topk_cpu = [](const std::vector<float> &input_data,
|
||||
// const int &k,
|
||||
// float *output_distence,
|
||||
// long *output_ids) -> void {
|
||||
// std::map<float, std::vector<int>> inverted_table;
|
||||
// for (int i = 0; i < input_data.size(); ++i) {
|
||||
// if (inverted_table.count(input_data[i]) == 1) {
|
||||
// auto& ori_vec = inverted_table[input_data[i]];
|
||||
// ori_vec.push_back(i);
|
||||
// }
|
||||
// else {
|
||||
// inverted_table[input_data[i]] = std::vector<int>{i};
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// int count = 0;
|
||||
// for (auto &item : inverted_table){
|
||||
// if (count == k) break;
|
||||
// for (auto &id : item.second){
|
||||
// output_distence[count] = item.first;
|
||||
// output_ids[count] = id;
|
||||
// if (++count == k) break;
|
||||
// }
|
||||
// }
|
||||
// };
|
||||
// auto cluster_topk = [&]() -> void {
|
||||
// QueryResult res;
|
||||
// for (auto &result_pair : batchresult) {
|
||||
// auto &dis = result_pair.second;
|
||||
// auto &nns = result_pair.first;
|
||||
//
|
||||
// topk_cpu(dis, k, output_distence, output_ids);
|
||||
//
|
||||
// int inner_k = dis.size() < k ? dis.size() : k;
|
||||
// for (int i = 0; i < inner_k; ++i) {
|
||||
// res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
|
||||
// }
|
||||
// results.push_back(res); // append to result list
|
||||
// res.clear();
|
||||
// memset(output_distence, 0, k * nq * sizeof(float));
|
||||
// memset(output_ids, 0, k * nq * sizeof(long));
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// search_in_index(raw_files);
|
||||
// search_in_index(index_files);
|
||||
//
|
||||
// ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
|
||||
// cluster_topk();
|
||||
//
|
||||
// free(output_distence);
|
||||
// free(output_ids);
|
||||
// }
|
||||
//
|
||||
// if (results.empty()) {
|
||||
// return Status::NotFound("Group " + table_id + ", search result not found!");
|
||||
// }
|
||||
//
|
||||
// QueryResults temp_results;
|
||||
// CalcScore(nq, vectors, dim, results, temp_results);
|
||||
// results.swap(temp_results);
|
||||
//
|
||||
// return Status::OK();
|
||||
//}
|
||||
|
||||
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
|
||||
uint64_t k, uint64_t nq, const float* vectors,
|
||||
|
@ -62,8 +62,8 @@ public:
|
||||
virtual ~DBImpl();
|
||||
|
||||
private:
|
||||
Status QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
const float* vectors, const meta::DatesT& dates, QueryResults& results);
|
||||
// Status QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
// const float* vectors, const meta::DatesT& dates, QueryResults& results);
|
||||
|
||||
Status QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
|
||||
uint64_t k, uint64_t nq, const float* vectors,
|
||||
|
@ -11,14 +11,9 @@ namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
Status ExecutionEngine::AddWithIds(const std::vector<float>& vectors, const std::vector<long>& vector_ids) {
|
||||
long n1 = (long)vectors.size();
|
||||
long n2 = (long)vector_ids.size();
|
||||
if (n1 != n2) {
|
||||
LOG(ERROR) << "vectors size is not equal to the size of vector_ids: " << n1 << "!=" << n2;
|
||||
return Status::Error("Error: AddWithIds");
|
||||
}
|
||||
return AddWithIds(n1, vectors.data(), vector_ids.data());
|
||||
Status ExecutionEngine::AddWithIdArray(const std::vector<float>& vectors, const std::vector<long>& vector_ids) {
|
||||
long n = (long)vector_ids.size();
|
||||
return AddWithIds(n, vectors.data(), vector_ids.data());
|
||||
}
|
||||
|
||||
|
||||
|
@ -23,8 +23,7 @@ enum class EngineType {
|
||||
class ExecutionEngine {
|
||||
public:
|
||||
|
||||
virtual Status AddWithIds(const std::vector<float>& vectors,
|
||||
const std::vector<long>& vector_ids);
|
||||
virtual Status AddWithIdArray(const std::vector<float>& vectors, const std::vector<long>& vector_ids);
|
||||
|
||||
virtual Status AddWithIds(long n, const float *xdata, const long *xids) = 0;
|
||||
|
||||
|
@ -24,14 +24,6 @@ TaskDispatchQueue::Put(const ScheduleContextPtr &context) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (queue_.size() >= capacity_) {
|
||||
std::string error_msg =
|
||||
"blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " +
|
||||
std::to_string(queue_.size());
|
||||
SERVER_LOG_ERROR << error_msg;
|
||||
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
|
||||
TaskDispatchStrategy::Schedule(context, queue_);
|
||||
|
||||
empty_.notify_all();
|
||||
@ -42,12 +34,6 @@ TaskDispatchQueue::Take() {
|
||||
std::unique_lock <std::mutex> lock(mtx);
|
||||
empty_.wait(lock, [this] { return !queue_.empty(); });
|
||||
|
||||
if (queue_.empty()) {
|
||||
std::string error_msg = "blocking queue empty";
|
||||
SERVER_LOG_ERROR << error_msg;
|
||||
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
|
||||
ScheduleTaskPtr front(queue_.front());
|
||||
queue_.pop_front();
|
||||
full_.notify_all();
|
||||
|
@ -74,20 +74,26 @@ public:
|
||||
}
|
||||
|
||||
std::string table_id = context->table_id();
|
||||
for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) {
|
||||
|
||||
//put delete task to proper position
|
||||
//for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1
|
||||
//if user want to delete table1, the DeleteTask will be insert into No.6 position
|
||||
for(std::list<ScheduleTaskPtr>::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) {
|
||||
if((*iter)->type() != ScheduleTaskType::kIndexLoad) {
|
||||
continue;
|
||||
}
|
||||
|
||||
//put delete task to proper position
|
||||
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(*iter);
|
||||
if(loader->file_->table_id_ == table_id) {
|
||||
|
||||
task_list.insert(++iter, delete_task);
|
||||
break;
|
||||
}
|
||||
if(loader->file_->table_id_ != table_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
task_list.insert(iter.base(), delete_task);
|
||||
return true;
|
||||
}
|
||||
|
||||
//no task is searching this table, put DeleteTask to front of list so that the table will be delete asap
|
||||
task_list.push_front(delete_task);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
@ -7,6 +7,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
|
||||
aux_source_directory(./ test_srcs)
|
||||
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
|
||||
@ -30,10 +31,7 @@ set(db_test_src
|
||||
${db_scheduler_srcs}
|
||||
${wrapper_src}
|
||||
${require_files}
|
||||
MySQLMetaImpl_test.cpp
|
||||
utils.cpp
|
||||
db_tests.cpp
|
||||
meta_tests.cpp)
|
||||
${test_srcs})
|
||||
|
||||
cuda_add_executable(db_test ${db_test_src})
|
||||
|
||||
|
@ -89,20 +89,14 @@ TEST_F(DBTest, CONFIG_TEST) {
|
||||
|
||||
|
||||
TEST_F(DBTest, DB_TEST) {
|
||||
static const std::string table_name = "test_group";
|
||||
static const int table_dim = 256;
|
||||
|
||||
engine::meta::TableSchema table_info;
|
||||
table_info.dimension_ = table_dim;
|
||||
table_info.table_id_ = table_name;
|
||||
table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP;
|
||||
engine::meta::TableSchema table_info = BuildTableSchema();
|
||||
engine::Status stat = db_->CreateTable(table_info);
|
||||
|
||||
engine::meta::TableSchema table_info_get;
|
||||
table_info_get.table_id_ = table_name;
|
||||
table_info_get.table_id_ = TABLE_NAME;
|
||||
stat = db_->DescribeTable(table_info_get);
|
||||
ASSERT_STATS(stat);
|
||||
ASSERT_EQ(table_info_get.dimension_, table_dim);
|
||||
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
|
||||
|
||||
engine::IDNumbers vector_ids;
|
||||
engine::IDNumbers target_ids;
|
||||
@ -131,7 +125,7 @@ TEST_F(DBTest, DB_TEST) {
|
||||
prev_count = count;
|
||||
|
||||
START_TIMER;
|
||||
stat = db_->Query(table_name, k, qb, qxb.data(), results);
|
||||
stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results);
|
||||
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
|
||||
STOP_TIMER(ss.str());
|
||||
|
||||
@ -154,10 +148,10 @@ TEST_F(DBTest, DB_TEST) {
|
||||
|
||||
for (auto i=0; i<loop; ++i) {
|
||||
if (i==40) {
|
||||
db_->InsertVectors(table_name, qb, qxb.data(), target_ids);
|
||||
db_->InsertVectors(TABLE_NAME, qb, qxb.data(), target_ids);
|
||||
ASSERT_EQ(target_ids.size(), qb);
|
||||
} else {
|
||||
db_->InsertVectors(table_name, nb, xb.data(), vector_ids);
|
||||
db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
@ -224,6 +218,18 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
|
||||
engine::meta::TableSchema table_info = BuildTableSchema();
|
||||
engine::Status stat = db_->CreateTable(table_info);
|
||||
|
||||
std::vector<engine::meta::TableSchema> table_schema_array;
|
||||
stat = db_->AllTables(table_schema_array);
|
||||
ASSERT_STATS(stat);
|
||||
bool bfound = false;
|
||||
for(auto& schema : table_schema_array) {
|
||||
if(schema.table_id_ == TABLE_NAME) {
|
||||
bfound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
ASSERT_TRUE(bfound);
|
||||
|
||||
engine::meta::TableSchema table_info_get;
|
||||
table_info_get.table_id_ = TABLE_NAME;
|
||||
stat = db_->DescribeTable(table_info_get);
|
||||
|
@ -39,6 +39,10 @@ TEST_F(MetaTest, TABLE_TEST) {
|
||||
table.table_id_ = table_id;
|
||||
status = impl_->CreateTable(table);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
table.table_id_ = "";
|
||||
status = impl_->CreateTable(table);
|
||||
ASSERT_TRUE(status.ok());
|
||||
}
|
||||
|
||||
TEST_F(MetaTest, TABLE_FILE_TEST) {
|
||||
@ -46,6 +50,7 @@ TEST_F(MetaTest, TABLE_FILE_TEST) {
|
||||
|
||||
meta::TableSchema table;
|
||||
table.table_id_ = table_id;
|
||||
table.dimension_ = 256;
|
||||
auto status = impl_->CreateTable(table);
|
||||
|
||||
meta::TableFileSchema table_file;
|
||||
@ -54,6 +59,11 @@ TEST_F(MetaTest, TABLE_FILE_TEST) {
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW);
|
||||
|
||||
uint64_t cnt = 0;
|
||||
status = impl_->Count(table_id, cnt);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(cnt, 0UL);
|
||||
|
||||
auto file_id = table_file.file_id_;
|
||||
|
||||
auto new_file_type = meta::TableFileSchema::INDEX;
|
||||
|
103
cpp/unittest/db/misc_test.cpp
Normal file
103
cpp/unittest/db/misc_test.cpp
Normal file
@ -0,0 +1,103 @@
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
// Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
// Proprietary and confidential.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <easylogging++.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "db/FaissExecutionEngine.h"
|
||||
#include "db/Exception.h"
|
||||
#include "db/Status.h"
|
||||
#include "db/Options.h"
|
||||
#include "db/DBMetaImpl.h"
|
||||
|
||||
#include <vector>
|
||||
|
||||
using namespace zilliz::milvus;
|
||||
|
||||
TEST(DBMiscTest, ENGINE_API_TEST) {
|
||||
//engine api AddWithIdArray
|
||||
const uint16_t dim = 512;
|
||||
const long n = 10;
|
||||
engine::FaissExecutionEngine engine(512, "/tmp/1", "IDMap", "IDMap,Flat");
|
||||
std::vector<float> vectors;
|
||||
std::vector<long> ids;
|
||||
for (long i = 0; i < n; i++) {
|
||||
for (uint16_t k = 0; k < dim; k++) {
|
||||
vectors.push_back((float) k);
|
||||
}
|
||||
ids.push_back(i);
|
||||
}
|
||||
|
||||
auto status = engine.AddWithIdArray(vectors, ids);
|
||||
ASSERT_TRUE(status.ok());
|
||||
}
|
||||
|
||||
TEST(DBMiscTest, EXCEPTION_TEST) {
|
||||
engine::Exception ex1("");
|
||||
std::string what = ex1.what();
|
||||
ASSERT_FALSE(what.empty());
|
||||
|
||||
engine::OutOfRangeException ex2;
|
||||
what = ex2.what();
|
||||
ASSERT_FALSE(what.empty());
|
||||
}
|
||||
|
||||
TEST(DBMiscTest, STATUS_TEST) {
|
||||
engine::Status status = engine::Status::OK();
|
||||
std::string str = status.ToString();
|
||||
ASSERT_FALSE(str.empty());
|
||||
|
||||
status = engine::Status::Error("wrong", "mistake");
|
||||
ASSERT_TRUE(status.IsError());
|
||||
str = status.ToString();
|
||||
ASSERT_FALSE(str.empty());
|
||||
|
||||
status = engine::Status::NotFound("wrong", "mistake");
|
||||
ASSERT_TRUE(status.IsNotFound());
|
||||
str = status.ToString();
|
||||
ASSERT_FALSE(str.empty());
|
||||
|
||||
status = engine::Status::DBTransactionError("wrong", "mistake");
|
||||
ASSERT_TRUE(status.IsDBTransactionError());
|
||||
str = status.ToString();
|
||||
ASSERT_FALSE(str.empty());
|
||||
}
|
||||
|
||||
TEST(DBMiscTest, OPTIONS_TEST) {
|
||||
try {
|
||||
engine::ArchiveConf archive("$$##");
|
||||
} catch (std::exception& ex) {
|
||||
ASSERT_TRUE(true);
|
||||
}
|
||||
|
||||
{
|
||||
engine::ArchiveConf archive("delete", "no");
|
||||
ASSERT_TRUE(archive.GetCriterias().empty());
|
||||
}
|
||||
|
||||
{
|
||||
engine::ArchiveConf archive("delete", "1:2");
|
||||
ASSERT_TRUE(archive.GetCriterias().empty());
|
||||
}
|
||||
|
||||
{
|
||||
engine::ArchiveConf archive("delete", "1:2:3");
|
||||
ASSERT_TRUE(archive.GetCriterias().empty());
|
||||
}
|
||||
}
|
||||
|
||||
TEST(DBMiscTest, META_TEST) {
|
||||
engine::DBMetaOptions options;
|
||||
options.path = "/tmp/milvus_test";
|
||||
engine::meta::DBMetaImpl impl(options);
|
||||
|
||||
time_t tt;
|
||||
time( &tt );
|
||||
int delta = 10;
|
||||
engine::meta::DateT dt = impl.GetDate(tt, delta);
|
||||
ASSERT_GT(dt, 0);
|
||||
}
|
124
cpp/unittest/db/scheduler_test.cpp
Normal file
124
cpp/unittest/db/scheduler_test.cpp
Normal file
@ -0,0 +1,124 @@
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
// Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
// Proprietary and confidential.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <easylogging++.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "db/scheduler/TaskScheduler.h"
|
||||
#include "db/scheduler/TaskDispatchStrategy.h"
|
||||
#include "db/scheduler/TaskDispatchQueue.h"
|
||||
#include "db/scheduler/task/SearchTask.h"
|
||||
#include "db/scheduler/task/DeleteTask.h"
|
||||
#include "db/scheduler/task/IndexLoadTask.h"
|
||||
|
||||
using namespace zilliz::milvus;
|
||||
|
||||
namespace {
|
||||
|
||||
engine::TableFileSchemaPtr CreateTabileFileStruct(size_t id, const std::string& table_id) {
|
||||
auto file = std::make_shared<engine::meta::TableFileSchema>();
|
||||
file->id_ = id;
|
||||
file->table_id_ = table_id;
|
||||
return file;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST(DBSchedulerTest, TASK_QUEUE_TEST) {
|
||||
engine::TaskDispatchQueue queue;
|
||||
queue.SetCapacity(1000);
|
||||
queue.Put(nullptr);
|
||||
ASSERT_EQ(queue.Size(), 1UL);
|
||||
|
||||
auto ptr = queue.Take();
|
||||
ASSERT_EQ(ptr, nullptr);
|
||||
ASSERT_TRUE(queue.Empty());
|
||||
|
||||
engine::SearchContextPtr context_ptr = std::make_shared<engine::SearchContext>(1, 1, nullptr);
|
||||
for(size_t i = 0; i < 10; i++) {
|
||||
auto file = CreateTabileFileStruct(i, "tbl");
|
||||
context_ptr->AddIndexFile(file);
|
||||
}
|
||||
|
||||
queue.Put(context_ptr);
|
||||
ASSERT_EQ(queue.Size(), 10);
|
||||
|
||||
auto index_files = context_ptr->GetIndexMap();
|
||||
|
||||
ptr = queue.Front();
|
||||
ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad);
|
||||
engine::IndexLoadTaskPtr load_task = std::static_pointer_cast<engine::IndexLoadTask>(ptr);
|
||||
ASSERT_EQ(load_task->file_->id_, index_files.begin()->first);
|
||||
|
||||
ptr = queue.Back();
|
||||
ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad);
|
||||
}
|
||||
|
||||
TEST(DBSchedulerTest, SEARCH_SCHEDULER_TEST) {
|
||||
std::list<engine::ScheduleTaskPtr> task_list;
|
||||
bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list);
|
||||
ASSERT_FALSE(ret);
|
||||
|
||||
for(size_t i = 10; i < 30; i++) {
|
||||
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
|
||||
task_ptr->file_ = CreateTabileFileStruct(i, "tbl");
|
||||
task_list.push_back(task_ptr);
|
||||
}
|
||||
|
||||
engine::SearchContextPtr context_ptr = std::make_shared<engine::SearchContext>(1, 1, nullptr);
|
||||
for(size_t i = 0; i < 20; i++) {
|
||||
auto file = CreateTabileFileStruct(i, "tbl");
|
||||
context_ptr->AddIndexFile(file);
|
||||
}
|
||||
|
||||
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
|
||||
ASSERT_TRUE(ret);
|
||||
ASSERT_EQ(task_list.size(), 30);
|
||||
}
|
||||
|
||||
TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) {
|
||||
std::list<engine::ScheduleTaskPtr> task_list;
|
||||
bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list);
|
||||
ASSERT_FALSE(ret);
|
||||
|
||||
const std::string table_id = "to_delete_table";
|
||||
for(size_t i = 0; i < 10; i++) {
|
||||
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
|
||||
task_ptr->file_ = CreateTabileFileStruct(i, table_id);
|
||||
task_list.push_back(task_ptr);
|
||||
}
|
||||
|
||||
for(size_t i = 0; i < 10; i++) {
|
||||
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
|
||||
task_ptr->file_ = CreateTabileFileStruct(i, "other_table");
|
||||
task_list.push_back(task_ptr);
|
||||
}
|
||||
|
||||
engine::meta::Meta::Ptr meta_ptr;
|
||||
engine::DeleteContextPtr context_ptr = std::make_shared<engine::DeleteContext>(table_id, meta_ptr);
|
||||
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
|
||||
ASSERT_TRUE(ret);
|
||||
ASSERT_EQ(task_list.size(), 21);
|
||||
|
||||
auto temp_list = task_list;
|
||||
for(size_t i = 0; ; i++) {
|
||||
engine::ScheduleTaskPtr task_ptr = temp_list.front();
|
||||
temp_list.pop_front();
|
||||
if(task_ptr->type() == engine::ScheduleTaskType::kDelete) {
|
||||
ASSERT_EQ(i, 10);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
context_ptr = std::make_shared<engine::DeleteContext>("no_task_table", meta_ptr);
|
||||
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
|
||||
ASSERT_TRUE(ret);
|
||||
ASSERT_EQ(task_list.size(), 22);
|
||||
|
||||
engine::ScheduleTaskPtr task_ptr = task_list.front();
|
||||
ASSERT_EQ(task_ptr->type(), engine::ScheduleTaskType::kDelete);
|
||||
}
|
Loading…
Reference in New Issue
Block a user