mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Merge remote-tracking branch 'upstream/branch-0.4.0' into branch-0.4.0
Former-commit-id: 5bdebcd74b5a5d7665029f727ac452807d7f564f
This commit is contained in:
commit
dcf8d3e838
@ -20,3 +20,4 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-215 - Add Milvus cluster CI/CD groovy file
|
||||
- MS-277 - Update CUDA Version to V10.1
|
||||
- MS-336 - Scheduler interface
|
||||
- MS-344 - Add TaskTable Test
|
||||
|
@ -16,7 +16,6 @@ aux_source_directory(db/insert db_insert_files)
|
||||
aux_source_directory(db/meta db_meta_files)
|
||||
aux_source_directory(metrics metrics_files)
|
||||
aux_source_directory(wrapper/knowhere knowhere_files)
|
||||
aux_source_directory(scheduler new_scheduler_files)
|
||||
|
||||
aux_source_directory(db/scheduler scheduler_files)
|
||||
aux_source_directory(db/scheduler/context scheduler_context_files)
|
||||
@ -65,7 +64,6 @@ set(db_files
|
||||
${db_insert_files}
|
||||
${db_meta_files}
|
||||
${db_scheduler_files}
|
||||
${new_scheduler_files}
|
||||
${metrics_files}
|
||||
${knowhere_files}
|
||||
)
|
||||
|
@ -37,6 +37,14 @@ ResourceMgr::Add(ResourcePtr &&resource) {
|
||||
finish_task_event_[index] = true;
|
||||
event_cv_.notify_one();
|
||||
});
|
||||
resource->RegisterOnCopyCompleted([&] {
|
||||
copy_completed_event_[index] = true;
|
||||
event_cv_.notify_one();
|
||||
});
|
||||
resource->RegisterOnTaskTableUpdated([&] {
|
||||
task_table_updated_event_[index] = true;
|
||||
event_cv_.notify_one();
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -110,7 +118,7 @@ ResourceMgr::Dump() {
|
||||
|
||||
for (uint64_t i = 0; i < resources_.size(); ++i) {
|
||||
str += "Resource No." + std::to_string(i) + ":\n";
|
||||
str += resources_[i]->Dump();
|
||||
//str += resources_[i]->Dump();
|
||||
}
|
||||
|
||||
return str;
|
||||
|
@ -75,25 +75,33 @@ public:
|
||||
* Register on start up event;
|
||||
*/
|
||||
void
|
||||
RegisterOnStartUp(std::function<void(void)> func);
|
||||
RegisterOnStartUp(std::function<void(void)> func) {
|
||||
on_start_up_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on finish one task event;
|
||||
*/
|
||||
void
|
||||
RegisterOnFinishTask(std::function<void(void)> func);
|
||||
RegisterOnFinishTask(std::function<void(void)> func) {
|
||||
on_finish_task_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on copy task data completed event;
|
||||
*/
|
||||
void
|
||||
RegisterOnCopyCompleted(std::function<void(void)> func);
|
||||
RegisterOnCopyCompleted(std::function<void(void)> func) {
|
||||
on_copy_completed_ = func;
|
||||
}
|
||||
|
||||
/*
|
||||
* Register on task table updated event;
|
||||
*/
|
||||
void
|
||||
RegisterOnTaskTableUpdated(std::function<void(void)> func);
|
||||
RegisterOnTaskTableUpdated(std::function<void(void)> func) {
|
||||
on_task_table_updated_ = func;
|
||||
}
|
||||
|
||||
protected:
|
||||
Resource(std::string name, ResourceType type)
|
||||
|
@ -42,5 +42,5 @@ add_subdirectory(server)
|
||||
add_subdirectory(db)
|
||||
add_subdirectory(knowhere)
|
||||
add_subdirectory(metrics)
|
||||
add_subdirectory(scheduler)
|
||||
#add_subdirectory(scheduler)
|
||||
#add_subdirectory(storage)
|
@ -6,7 +6,7 @@
|
||||
|
||||
using namespace zilliz::milvus::engine;
|
||||
|
||||
int main() {
|
||||
TEST(normal_test, DISABLED_test1) {
|
||||
|
||||
// ResourceMgr only compose resources, provide unified event
|
||||
auto res_mgr = std::make_shared<ResourceMgr>();
|
||||
|
127
cpp/unittest/scheduler/tasktable_test.cpp
Normal file
127
cpp/unittest/scheduler/tasktable_test.cpp
Normal file
@ -0,0 +1,127 @@
|
||||
#include "scheduler/TaskTable.h"
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
|
||||
using namespace zilliz::milvus::engine;
|
||||
|
||||
class TaskTableItemTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
item1_.id = 0;
|
||||
item1_.state = TaskTableItemState::MOVED;
|
||||
item1_.priority = 10;
|
||||
}
|
||||
|
||||
TaskTableItem default_;
|
||||
TaskTableItem item1_;
|
||||
};
|
||||
|
||||
TEST_F(TaskTableItemTest, construct) {
|
||||
ASSERT_EQ(default_.id, 0);
|
||||
ASSERT_EQ(default_.state, TaskTableItemState::INVALID);
|
||||
ASSERT_EQ(default_.priority, 0);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, copy) {
|
||||
TaskTableItem another(item1_);
|
||||
ASSERT_EQ(another.id, item1_.id);
|
||||
ASSERT_EQ(another.state, item1_.state);
|
||||
ASSERT_EQ(another.priority, item1_.priority);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableItemTest, destruct) {
|
||||
auto p_item = new TaskTableItem();
|
||||
delete p_item;
|
||||
}
|
||||
|
||||
|
||||
/************ TaskTableBaseTest ************/
|
||||
|
||||
class TaskTableBaseTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
invalid_task_ = nullptr;
|
||||
task1_ = std::make_shared<Task>("1");
|
||||
task2_ = std::make_shared<Task>("2");
|
||||
|
||||
empty_table_ = TaskTable();
|
||||
}
|
||||
|
||||
TaskPtr invalid_task_;
|
||||
TaskPtr task1_;
|
||||
TaskPtr task2_;
|
||||
TaskTable empty_table_;
|
||||
};
|
||||
|
||||
|
||||
TEST_F(TaskTableBaseTest, put_task) {
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.Get(0).task, task1_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, put_invalid_test) {
|
||||
empty_table_.Put(invalid_task_);
|
||||
ASSERT_EQ(empty_table_.Get(0).task, invalid_task_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, put_batch) {
|
||||
std::vector<TaskPtr> tasks{task1_, task2_};
|
||||
empty_table_.Put(tasks);
|
||||
ASSERT_EQ(empty_table_.Get(0).task, task1_);
|
||||
ASSERT_EQ(empty_table_.Get(1).task, task2_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, put_empty_batch) {
|
||||
std::vector<TaskPtr> tasks{};
|
||||
empty_table_.Put(tasks);
|
||||
}
|
||||
|
||||
/************ TaskTableAdvanceTest ************/
|
||||
|
||||
class TaskTableAdvanceTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
for (uint64_t i = 0; i < 8; ++i) {
|
||||
auto task = std::make_shared<Task>(std::to_string(i));
|
||||
table1_.Put(task);
|
||||
}
|
||||
|
||||
table1_.Get(0).state = TaskTableItemState::INVALID;
|
||||
table1_.Get(1).state = TaskTableItemState::START;
|
||||
table1_.Get(2).state = TaskTableItemState::LOADING;
|
||||
table1_.Get(3).state = TaskTableItemState::LOADED;
|
||||
table1_.Get(4).state = TaskTableItemState::EXECUTING;
|
||||
table1_.Get(5).state = TaskTableItemState::EXECUTED;
|
||||
table1_.Get(6).state = TaskTableItemState::MOVING;
|
||||
table1_.Get(7).state = TaskTableItemState::MOVED;
|
||||
}
|
||||
|
||||
TaskTable table1_;
|
||||
};
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, load) {
|
||||
table1_.Load(1);
|
||||
table1_.Loaded(2);
|
||||
|
||||
ASSERT_EQ(table1_.Get(1).state, TaskTableItemState::LOADING);
|
||||
ASSERT_EQ(table1_.Get(2).state, TaskTableItemState::LOADED);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, execute) {
|
||||
table1_.Execute(3);
|
||||
table1_.Executed(4);
|
||||
|
||||
ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::EXECUTING);
|
||||
ASSERT_EQ(table1_.Get(4).state, TaskTableItemState::EXECUTED);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, move) {
|
||||
table1_.Move(3);
|
||||
table1_.Moved(6);
|
||||
|
||||
ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::MOVING);
|
||||
ASSERT_EQ(table1_.Get(6).state, TaskTableItemState::MOVED);
|
||||
}
|
Loading…
Reference in New Issue
Block a user