diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 92bbff2b84..a1bcc7ff44 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -253,144 +253,144 @@ Status DBImpl::Query(const std::string& table_id, const std::vector return QueryAsync(table_id, files_array, k, nq, vectors, dates, results); } -Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq, - const float* vectors, const meta::DatesT& dates, QueryResults& results) { - meta::DatePartionedTableFilesSchema files; - auto status = meta_ptr_->FilesToSearch(table_id, dates, files); - if (!status.ok()) { return status; } - - ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size(); - - meta::TableFilesSchema index_files; - meta::TableFilesSchema raw_files; - for (auto &day_files : files) { - for (auto &file : day_files.second) { - file.file_type_ == meta::TableFileSchema::INDEX ? - index_files.push_back(file) : raw_files.push_back(file); - } - } - - int dim = 0; - if (!index_files.empty()) { - dim = index_files[0].dimension_; - } else if (!raw_files.empty()) { - dim = raw_files[0].dimension_; - } else { - ENGINE_LOG_DEBUG << "no files to search"; - return Status::OK(); - } - - { - // [{ids, distence}, ...] - using SearchResult = std::pair, std::vector>; - std::vector batchresult(nq); // allocate nq cells. - - auto cluster = [&](long *nns, float *dis, const int& k) -> void { - for (int i = 0; i < nq; ++i) { - auto f_begin = batchresult[i].first.cbegin(); - auto s_begin = batchresult[i].second.cbegin(); - batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k); - batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k); - } - }; - - // Allocate Memory - float *output_distence; - long *output_ids; - output_distence = (float *) malloc(k * nq * sizeof(float)); - output_ids = (long *) malloc(k * nq * sizeof(long)); - memset(output_distence, 0, k * nq * sizeof(float)); - memset(output_ids, 0, k * nq * sizeof(long)); - - long search_set_size = 0; - - auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void { - for (auto &file : file_vec) { - - ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_); - index->Load(); - auto file_size = index->PhysicalSize(); - search_set_size += file_size; - - ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: " - << file_size/(1024*1024) << " M"; - - int inner_k = index->Count() < k ? index->Count() : k; - auto start_time = METRICS_NOW_TIME; - index->Search(nq, vectors, inner_k, output_distence, output_ids); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - CollectFileMetrics(file.file_type_, file_size, total_time); - cluster(output_ids, output_distence, inner_k); // cluster to each query - memset(output_distence, 0, k * nq * sizeof(float)); - memset(output_ids, 0, k * nq * sizeof(long)); - } - }; - - auto topk_cpu = [](const std::vector &input_data, - const int &k, - float *output_distence, - long *output_ids) -> void { - std::map> inverted_table; - for (int i = 0; i < input_data.size(); ++i) { - if (inverted_table.count(input_data[i]) == 1) { - auto& ori_vec = inverted_table[input_data[i]]; - ori_vec.push_back(i); - } - else { - inverted_table[input_data[i]] = std::vector{i}; - } - } - - int count = 0; - for (auto &item : inverted_table){ - if (count == k) break; - for (auto &id : item.second){ - output_distence[count] = item.first; - output_ids[count] = id; - if (++count == k) break; - } - } - }; - auto cluster_topk = [&]() -> void { - QueryResult res; - for (auto &result_pair : batchresult) { - auto &dis = result_pair.second; - auto &nns = result_pair.first; - - topk_cpu(dis, k, output_distence, output_ids); - - int inner_k = dis.size() < k ? dis.size() : k; - for (int i = 0; i < inner_k; ++i) { - res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping - } - results.push_back(res); // append to result list - res.clear(); - memset(output_distence, 0, k * nq * sizeof(float)); - memset(output_ids, 0, k * nq * sizeof(long)); - } - }; - - search_in_index(raw_files); - search_in_index(index_files); - - ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M"; - cluster_topk(); - - free(output_distence); - free(output_ids); - } - - if (results.empty()) { - return Status::NotFound("Group " + table_id + ", search result not found!"); - } - - QueryResults temp_results; - CalcScore(nq, vectors, dim, results, temp_results); - results.swap(temp_results); - - return Status::OK(); -} +//Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq, +// const float* vectors, const meta::DatesT& dates, QueryResults& results) { +// meta::DatePartionedTableFilesSchema files; +// auto status = meta_ptr_->FilesToSearch(table_id, dates, files); +// if (!status.ok()) { return status; } +// +// ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size(); +// +// meta::TableFilesSchema index_files; +// meta::TableFilesSchema raw_files; +// for (auto &day_files : files) { +// for (auto &file : day_files.second) { +// file.file_type_ == meta::TableFileSchema::INDEX ? +// index_files.push_back(file) : raw_files.push_back(file); +// } +// } +// +// int dim = 0; +// if (!index_files.empty()) { +// dim = index_files[0].dimension_; +// } else if (!raw_files.empty()) { +// dim = raw_files[0].dimension_; +// } else { +// ENGINE_LOG_DEBUG << "no files to search"; +// return Status::OK(); +// } +// +// { +// // [{ids, distence}, ...] +// using SearchResult = std::pair, std::vector>; +// std::vector batchresult(nq); // allocate nq cells. +// +// auto cluster = [&](long *nns, float *dis, const int& k) -> void { +// for (int i = 0; i < nq; ++i) { +// auto f_begin = batchresult[i].first.cbegin(); +// auto s_begin = batchresult[i].second.cbegin(); +// batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k); +// batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k); +// } +// }; +// +// // Allocate Memory +// float *output_distence; +// long *output_ids; +// output_distence = (float *) malloc(k * nq * sizeof(float)); +// output_ids = (long *) malloc(k * nq * sizeof(long)); +// memset(output_distence, 0, k * nq * sizeof(float)); +// memset(output_ids, 0, k * nq * sizeof(long)); +// +// long search_set_size = 0; +// +// auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void { +// for (auto &file : file_vec) { +// +// ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_); +// index->Load(); +// auto file_size = index->PhysicalSize(); +// search_set_size += file_size; +// +// ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: " +// << file_size/(1024*1024) << " M"; +// +// int inner_k = index->Count() < k ? index->Count() : k; +// auto start_time = METRICS_NOW_TIME; +// index->Search(nq, vectors, inner_k, output_distence, output_ids); +// auto end_time = METRICS_NOW_TIME; +// auto total_time = METRICS_MICROSECONDS(start_time, end_time); +// CollectFileMetrics(file.file_type_, file_size, total_time); +// cluster(output_ids, output_distence, inner_k); // cluster to each query +// memset(output_distence, 0, k * nq * sizeof(float)); +// memset(output_ids, 0, k * nq * sizeof(long)); +// } +// }; +// +// auto topk_cpu = [](const std::vector &input_data, +// const int &k, +// float *output_distence, +// long *output_ids) -> void { +// std::map> inverted_table; +// for (int i = 0; i < input_data.size(); ++i) { +// if (inverted_table.count(input_data[i]) == 1) { +// auto& ori_vec = inverted_table[input_data[i]]; +// ori_vec.push_back(i); +// } +// else { +// inverted_table[input_data[i]] = std::vector{i}; +// } +// } +// +// int count = 0; +// for (auto &item : inverted_table){ +// if (count == k) break; +// for (auto &id : item.second){ +// output_distence[count] = item.first; +// output_ids[count] = id; +// if (++count == k) break; +// } +// } +// }; +// auto cluster_topk = [&]() -> void { +// QueryResult res; +// for (auto &result_pair : batchresult) { +// auto &dis = result_pair.second; +// auto &nns = result_pair.first; +// +// topk_cpu(dis, k, output_distence, output_ids); +// +// int inner_k = dis.size() < k ? dis.size() : k; +// for (int i = 0; i < inner_k; ++i) { +// res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping +// } +// results.push_back(res); // append to result list +// res.clear(); +// memset(output_distence, 0, k * nq * sizeof(float)); +// memset(output_ids, 0, k * nq * sizeof(long)); +// } +// }; +// +// search_in_index(raw_files); +// search_in_index(index_files); +// +// ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M"; +// cluster_topk(); +// +// free(output_distence); +// free(output_ids); +// } +// +// if (results.empty()) { +// return Status::NotFound("Group " + table_id + ", search result not found!"); +// } +// +// QueryResults temp_results; +// CalcScore(nq, vectors, dim, results, temp_results); +// results.swap(temp_results); +// +// return Status::OK(); +//} Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, const float* vectors, diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index b4d60a27a9..43627cbace 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -62,8 +62,8 @@ public: virtual ~DBImpl(); private: - Status QuerySync(const std::string& table_id, uint64_t k, uint64_t nq, - const float* vectors, const meta::DatesT& dates, QueryResults& results); +// Status QuerySync(const std::string& table_id, uint64_t k, uint64_t nq, +// const float* vectors, const meta::DatesT& dates, QueryResults& results); Status QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, const float* vectors, diff --git a/cpp/src/db/ExecutionEngine.cpp b/cpp/src/db/ExecutionEngine.cpp index f27d04dfa0..3412eb34bd 100644 --- a/cpp/src/db/ExecutionEngine.cpp +++ b/cpp/src/db/ExecutionEngine.cpp @@ -11,14 +11,9 @@ namespace zilliz { namespace milvus { namespace engine { -Status ExecutionEngine::AddWithIds(const std::vector& vectors, const std::vector& vector_ids) { - long n1 = (long)vectors.size(); - long n2 = (long)vector_ids.size(); - if (n1 != n2) { - LOG(ERROR) << "vectors size is not equal to the size of vector_ids: " << n1 << "!=" << n2; - return Status::Error("Error: AddWithIds"); - } - return AddWithIds(n1, vectors.data(), vector_ids.data()); +Status ExecutionEngine::AddWithIdArray(const std::vector& vectors, const std::vector& vector_ids) { + long n = (long)vector_ids.size(); + return AddWithIds(n, vectors.data(), vector_ids.data()); } diff --git a/cpp/src/db/ExecutionEngine.h b/cpp/src/db/ExecutionEngine.h index f26dce6371..d2b4d01e67 100644 --- a/cpp/src/db/ExecutionEngine.h +++ b/cpp/src/db/ExecutionEngine.h @@ -23,8 +23,7 @@ enum class EngineType { class ExecutionEngine { public: - virtual Status AddWithIds(const std::vector& vectors, - const std::vector& vector_ids); + virtual Status AddWithIdArray(const std::vector& vectors, const std::vector& vector_ids); virtual Status AddWithIds(long n, const float *xdata, const long *xids) = 0; diff --git a/cpp/src/db/scheduler/TaskDispatchQueue.cpp b/cpp/src/db/scheduler/TaskDispatchQueue.cpp index 2ce0e933b4..b728e925a9 100644 --- a/cpp/src/db/scheduler/TaskDispatchQueue.cpp +++ b/cpp/src/db/scheduler/TaskDispatchQueue.cpp @@ -24,14 +24,6 @@ TaskDispatchQueue::Put(const ScheduleContextPtr &context) { return; } - if (queue_.size() >= capacity_) { - std::string error_msg = - "blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " + - std::to_string(queue_.size()); - SERVER_LOG_ERROR << error_msg; - throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); - } - TaskDispatchStrategy::Schedule(context, queue_); empty_.notify_all(); @@ -42,12 +34,6 @@ TaskDispatchQueue::Take() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); - if (queue_.empty()) { - std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; - throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); - } - ScheduleTaskPtr front(queue_.front()); queue_.pop_front(); full_.notify_all(); diff --git a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp index 7200f2584f..985f86cb09 100644 --- a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp +++ b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp @@ -74,20 +74,26 @@ public: } std::string table_id = context->table_id(); - for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) { + + //put delete task to proper position + //for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1 + //if user want to delete table1, the DeleteTask will be insert into No.6 position + for(std::list::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) { if((*iter)->type() != ScheduleTaskType::kIndexLoad) { continue; } - //put delete task to proper position IndexLoadTaskPtr loader = std::static_pointer_cast(*iter); - if(loader->file_->table_id_ == table_id) { - - task_list.insert(++iter, delete_task); - break; + if(loader->file_->table_id_ != table_id) { + continue; } + + task_list.insert(iter.base(), delete_task); + return true; } + //no task is searching this table, put DeleteTask to front of list so that the table will be delete asap + task_list.push_front(delete_task); return true; } }; diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 0d69aba803..5bae9190f5 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -7,6 +7,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +aux_source_directory(./ test_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) @@ -30,10 +31,7 @@ set(db_test_src ${db_scheduler_srcs} ${wrapper_src} ${require_files} - MySQLMetaImpl_test.cpp - utils.cpp - db_tests.cpp - meta_tests.cpp) + ${test_srcs}) cuda_add_executable(db_test ${db_test_src}) diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index b5eb8e642b..8e50b7403b 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -89,20 +89,14 @@ TEST_F(DBTest, CONFIG_TEST) { TEST_F(DBTest, DB_TEST) { - static const std::string table_name = "test_group"; - static const int table_dim = 256; - - engine::meta::TableSchema table_info; - table_info.dimension_ = table_dim; - table_info.table_id_ = table_name; - table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; + engine::meta::TableSchema table_info = BuildTableSchema(); engine::Status stat = db_->CreateTable(table_info); engine::meta::TableSchema table_info_get; - table_info_get.table_id_ = table_name; + table_info_get.table_id_ = TABLE_NAME; stat = db_->DescribeTable(table_info_get); ASSERT_STATS(stat); - ASSERT_EQ(table_info_get.dimension_, table_dim); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); engine::IDNumbers vector_ids; engine::IDNumbers target_ids; @@ -131,7 +125,7 @@ TEST_F(DBTest, DB_TEST) { prev_count = count; START_TIMER; - stat = db_->Query(table_name, k, qb, qxb.data(), results); + stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results); ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; STOP_TIMER(ss.str()); @@ -154,10 +148,10 @@ TEST_F(DBTest, DB_TEST) { for (auto i=0; iInsertVectors(table_name, qb, qxb.data(), target_ids); + db_->InsertVectors(TABLE_NAME, qb, qxb.data(), target_ids); ASSERT_EQ(target_ids.size(), qb); } else { - db_->InsertVectors(table_name, nb, xb.data(), vector_ids); + db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); } std::this_thread::sleep_for(std::chrono::microseconds(1)); } @@ -224,6 +218,18 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { engine::meta::TableSchema table_info = BuildTableSchema(); engine::Status stat = db_->CreateTable(table_info); + std::vector table_schema_array; + stat = db_->AllTables(table_schema_array); + ASSERT_STATS(stat); + bool bfound = false; + for(auto& schema : table_schema_array) { + if(schema.table_id_ == TABLE_NAME) { + bfound = true; + break; + } + } + ASSERT_TRUE(bfound); + engine::meta::TableSchema table_info_get; table_info_get.table_id_ = TABLE_NAME; stat = db_->DescribeTable(table_info_get); diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index a7933829c9..f6c9551aa7 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -39,6 +39,10 @@ TEST_F(MetaTest, TABLE_TEST) { table.table_id_ = table_id; status = impl_->CreateTable(table); ASSERT_TRUE(status.ok()); + + table.table_id_ = ""; + status = impl_->CreateTable(table); + ASSERT_TRUE(status.ok()); } TEST_F(MetaTest, TABLE_FILE_TEST) { @@ -46,6 +50,7 @@ TEST_F(MetaTest, TABLE_FILE_TEST) { meta::TableSchema table; table.table_id_ = table_id; + table.dimension_ = 256; auto status = impl_->CreateTable(table); meta::TableFileSchema table_file; @@ -54,6 +59,11 @@ TEST_F(MetaTest, TABLE_FILE_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW); + uint64_t cnt = 0; + status = impl_->Count(table_id, cnt); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(cnt, 0UL); + auto file_id = table_file.file_id_; auto new_file_type = meta::TableFileSchema::INDEX; diff --git a/cpp/unittest/db/misc_test.cpp b/cpp/unittest/db/misc_test.cpp new file mode 100644 index 0000000000..6f1f87c874 --- /dev/null +++ b/cpp/unittest/db/misc_test.cpp @@ -0,0 +1,103 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include +#include + +#include "db/FaissExecutionEngine.h" +#include "db/Exception.h" +#include "db/Status.h" +#include "db/Options.h" +#include "db/DBMetaImpl.h" + +#include + +using namespace zilliz::milvus; + +TEST(DBMiscTest, ENGINE_API_TEST) { + //engine api AddWithIdArray + const uint16_t dim = 512; + const long n = 10; + engine::FaissExecutionEngine engine(512, "/tmp/1", "IDMap", "IDMap,Flat"); + std::vector vectors; + std::vector ids; + for (long i = 0; i < n; i++) { + for (uint16_t k = 0; k < dim; k++) { + vectors.push_back((float) k); + } + ids.push_back(i); + } + + auto status = engine.AddWithIdArray(vectors, ids); + ASSERT_TRUE(status.ok()); +} + +TEST(DBMiscTest, EXCEPTION_TEST) { + engine::Exception ex1(""); + std::string what = ex1.what(); + ASSERT_FALSE(what.empty()); + + engine::OutOfRangeException ex2; + what = ex2.what(); + ASSERT_FALSE(what.empty()); +} + +TEST(DBMiscTest, STATUS_TEST) { + engine::Status status = engine::Status::OK(); + std::string str = status.ToString(); + ASSERT_FALSE(str.empty()); + + status = engine::Status::Error("wrong", "mistake"); + ASSERT_TRUE(status.IsError()); + str = status.ToString(); + ASSERT_FALSE(str.empty()); + + status = engine::Status::NotFound("wrong", "mistake"); + ASSERT_TRUE(status.IsNotFound()); + str = status.ToString(); + ASSERT_FALSE(str.empty()); + + status = engine::Status::DBTransactionError("wrong", "mistake"); + ASSERT_TRUE(status.IsDBTransactionError()); + str = status.ToString(); + ASSERT_FALSE(str.empty()); +} + +TEST(DBMiscTest, OPTIONS_TEST) { + try { + engine::ArchiveConf archive("$$##"); + } catch (std::exception& ex) { + ASSERT_TRUE(true); + } + + { + engine::ArchiveConf archive("delete", "no"); + ASSERT_TRUE(archive.GetCriterias().empty()); + } + + { + engine::ArchiveConf archive("delete", "1:2"); + ASSERT_TRUE(archive.GetCriterias().empty()); + } + + { + engine::ArchiveConf archive("delete", "1:2:3"); + ASSERT_TRUE(archive.GetCriterias().empty()); + } +} + +TEST(DBMiscTest, META_TEST) { + engine::DBMetaOptions options; + options.path = "/tmp/milvus_test"; + engine::meta::DBMetaImpl impl(options); + + time_t tt; + time( &tt ); + int delta = 10; + engine::meta::DateT dt = impl.GetDate(tt, delta); + ASSERT_GT(dt, 0); +} \ No newline at end of file diff --git a/cpp/unittest/db/scheduler_test.cpp b/cpp/unittest/db/scheduler_test.cpp new file mode 100644 index 0000000000..01a7057e00 --- /dev/null +++ b/cpp/unittest/db/scheduler_test.cpp @@ -0,0 +1,124 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include +#include + +#include "db/scheduler/TaskScheduler.h" +#include "db/scheduler/TaskDispatchStrategy.h" +#include "db/scheduler/TaskDispatchQueue.h" +#include "db/scheduler/task/SearchTask.h" +#include "db/scheduler/task/DeleteTask.h" +#include "db/scheduler/task/IndexLoadTask.h" + +using namespace zilliz::milvus; + +namespace { + +engine::TableFileSchemaPtr CreateTabileFileStruct(size_t id, const std::string& table_id) { + auto file = std::make_shared(); + file->id_ = id; + file->table_id_ = table_id; + return file; +} + +} + +TEST(DBSchedulerTest, TASK_QUEUE_TEST) { + engine::TaskDispatchQueue queue; + queue.SetCapacity(1000); + queue.Put(nullptr); + ASSERT_EQ(queue.Size(), 1UL); + + auto ptr = queue.Take(); + ASSERT_EQ(ptr, nullptr); + ASSERT_TRUE(queue.Empty()); + + engine::SearchContextPtr context_ptr = std::make_shared(1, 1, nullptr); + for(size_t i = 0; i < 10; i++) { + auto file = CreateTabileFileStruct(i, "tbl"); + context_ptr->AddIndexFile(file); + } + + queue.Put(context_ptr); + ASSERT_EQ(queue.Size(), 10); + + auto index_files = context_ptr->GetIndexMap(); + + ptr = queue.Front(); + ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad); + engine::IndexLoadTaskPtr load_task = std::static_pointer_cast(ptr); + ASSERT_EQ(load_task->file_->id_, index_files.begin()->first); + + ptr = queue.Back(); + ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad); +} + +TEST(DBSchedulerTest, SEARCH_SCHEDULER_TEST) { + std::list task_list; + bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list); + ASSERT_FALSE(ret); + + for(size_t i = 10; i < 30; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, "tbl"); + task_list.push_back(task_ptr); + } + + engine::SearchContextPtr context_ptr = std::make_shared(1, 1, nullptr); + for(size_t i = 0; i < 20; i++) { + auto file = CreateTabileFileStruct(i, "tbl"); + context_ptr->AddIndexFile(file); + } + + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 30); +} + +TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) { + std::list task_list; + bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list); + ASSERT_FALSE(ret); + + const std::string table_id = "to_delete_table"; + for(size_t i = 0; i < 10; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, table_id); + task_list.push_back(task_ptr); + } + + for(size_t i = 0; i < 10; i++) { + engine::IndexLoadTaskPtr task_ptr = std::make_shared(); + task_ptr->file_ = CreateTabileFileStruct(i, "other_table"); + task_list.push_back(task_ptr); + } + + engine::meta::Meta::Ptr meta_ptr; + engine::DeleteContextPtr context_ptr = std::make_shared(table_id, meta_ptr); + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 21); + + auto temp_list = task_list; + for(size_t i = 0; ; i++) { + engine::ScheduleTaskPtr task_ptr = temp_list.front(); + temp_list.pop_front(); + if(task_ptr->type() == engine::ScheduleTaskType::kDelete) { + ASSERT_EQ(i, 10); + break; + } + } + + context_ptr = std::make_shared("no_task_table", meta_ptr); + ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); + ASSERT_TRUE(ret); + ASSERT_EQ(task_list.size(), 22); + + engine::ScheduleTaskPtr task_ptr = task_list.front(); + ASSERT_EQ(task_ptr->type(), engine::ScheduleTaskType::kDelete); +}