add more unitest

Former-commit-id: a97158e5a0c0a719fa4d688f2cc2ef010a15838b
This commit is contained in:
groot 2019-06-30 11:54:57 +08:00
parent f8205b4958
commit 60cd1729d3
4 changed files with 138 additions and 23 deletions

View File

@ -24,14 +24,6 @@ TaskDispatchQueue::Put(const ScheduleContextPtr &context) {
return;
}
if (queue_.size() >= capacity_) {
std::string error_msg =
"blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " +
std::to_string(queue_.size());
SERVER_LOG_ERROR << error_msg;
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
TaskDispatchStrategy::Schedule(context, queue_);
empty_.notify_all();
@ -42,12 +34,6 @@ TaskDispatchQueue::Take() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
if (queue_.empty()) {
std::string error_msg = "blocking queue empty";
SERVER_LOG_ERROR << error_msg;
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
ScheduleTaskPtr front(queue_.front());
queue_.pop_front();
full_.notify_all();

View File

@ -74,20 +74,26 @@ public:
}
std::string table_id = context->table_id();
for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) {
//put delete task to proper position
//for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1
//if user want to delete table1, the DeleteTask will be insert into No.6 position
for(std::list<ScheduleTaskPtr>::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) {
if((*iter)->type() != ScheduleTaskType::kIndexLoad) {
continue;
}
//put delete task to proper position
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(*iter);
if(loader->file_->table_id_ == table_id) {
task_list.insert(++iter, delete_task);
break;
if(loader->file_->table_id_ != table_id) {
continue;
}
task_list.insert(iter.base(), delete_task);
return true;
}
//no task is searching this table, put DeleteTask to front of list so that the table will be delete asap
task_list.push_front(delete_task);
return true;
}
};

View File

@ -7,6 +7,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
aux_source_directory(./ test_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
@ -30,9 +31,7 @@ set(db_test_src
${db_scheduler_srcs}
${wrapper_src}
${require_files}
utils.cpp
db_tests.cpp
meta_tests.cpp)
${test_srcs})
cuda_add_executable(db_test ${db_test_src})

View File

@ -0,0 +1,124 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include <boost/filesystem.hpp>
#include "db/scheduler/TaskScheduler.h"
#include "db/scheduler/TaskDispatchStrategy.h"
#include "db/scheduler/TaskDispatchQueue.h"
#include "db/scheduler/task/SearchTask.h"
#include "db/scheduler/task/DeleteTask.h"
#include "db/scheduler/task/IndexLoadTask.h"
using namespace zilliz::milvus;
namespace {
engine::TableFileSchemaPtr CreateTabileFileStruct(size_t id, const std::string& table_id) {
auto file = std::make_shared<engine::meta::TableFileSchema>();
file->id_ = id;
file->table_id_ = table_id;
return file;
}
}
TEST(DBSchedulerTest, TASK_QUEUE_TEST) {
engine::TaskDispatchQueue queue;
queue.SetCapacity(1000);
queue.Put(nullptr);
ASSERT_EQ(queue.Size(), 1UL);
auto ptr = queue.Take();
ASSERT_EQ(ptr, nullptr);
ASSERT_TRUE(queue.Empty());
engine::SearchContextPtr context_ptr = std::make_shared<engine::SearchContext>(1, 1, nullptr);
for(size_t i = 0; i < 10; i++) {
auto file = CreateTabileFileStruct(i, "tbl");
context_ptr->AddIndexFile(file);
}
queue.Put(context_ptr);
ASSERT_EQ(queue.Size(), 10);
auto index_files = context_ptr->GetIndexMap();
ptr = queue.Front();
ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad);
engine::IndexLoadTaskPtr load_task = std::static_pointer_cast<engine::IndexLoadTask>(ptr);
ASSERT_EQ(load_task->file_->id_, index_files.begin()->first);
ptr = queue.Back();
ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad);
}
TEST(DBSchedulerTest, SEARCH_SCHEDULER_TEST) {
std::list<engine::ScheduleTaskPtr> task_list;
bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list);
ASSERT_FALSE(ret);
for(size_t i = 10; i < 30; i++) {
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
task_ptr->file_ = CreateTabileFileStruct(i, "tbl");
task_list.push_back(task_ptr);
}
engine::SearchContextPtr context_ptr = std::make_shared<engine::SearchContext>(1, 1, nullptr);
for(size_t i = 0; i < 20; i++) {
auto file = CreateTabileFileStruct(i, "tbl");
context_ptr->AddIndexFile(file);
}
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
ASSERT_TRUE(ret);
ASSERT_EQ(task_list.size(), 30);
}
TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) {
std::list<engine::ScheduleTaskPtr> task_list;
bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list);
ASSERT_FALSE(ret);
const std::string table_id = "to_delete_table";
for(size_t i = 0; i < 10; i++) {
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
task_ptr->file_ = CreateTabileFileStruct(i, table_id);
task_list.push_back(task_ptr);
}
for(size_t i = 0; i < 10; i++) {
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
task_ptr->file_ = CreateTabileFileStruct(i, "other_table");
task_list.push_back(task_ptr);
}
engine::meta::Meta::Ptr meta_ptr;
engine::DeleteContextPtr context_ptr = std::make_shared<engine::DeleteContext>(table_id, meta_ptr);
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
ASSERT_TRUE(ret);
ASSERT_EQ(task_list.size(), 21);
auto temp_list = task_list;
for(size_t i = 0; ; i++) {
engine::ScheduleTaskPtr task_ptr = temp_list.front();
temp_list.pop_front();
if(task_ptr->type() == engine::ScheduleTaskType::kDelete) {
ASSERT_EQ(i, 10);
break;
}
}
context_ptr = std::make_shared<engine::DeleteContext>("no_task_table", meta_ptr);
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
ASSERT_TRUE(ret);
ASSERT_EQ(task_list.size(), 22);
engine::ScheduleTaskPtr task_ptr = task_list.front();
ASSERT_EQ(task_ptr->type(), engine::ScheduleTaskType::kDelete);
}