mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-05 05:18:52 +08:00
rename functions tasktable, make it accessing likes standard structure
Former-commit-id: c0ba41635e710e0807af0fe07d0b6a266f60d044
This commit is contained in:
parent
37e4b0a934
commit
5e504b3435
@ -291,11 +291,6 @@ TaskTable::Put(std::vector<TaskPtr>& tasks) {
|
||||
}
|
||||
}
|
||||
|
||||
TaskTableItemPtr
|
||||
TaskTable::Get(uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
size_t
|
||||
TaskTable::TaskToExecute() {
|
||||
size_t count = 0;
|
||||
|
@ -106,6 +106,11 @@ class TaskTable : public interface::dumpable {
|
||||
TaskTable(const TaskTable&) = delete;
|
||||
TaskTable(TaskTable&&) = delete;
|
||||
|
||||
public:
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
public:
|
||||
inline void
|
||||
RegisterSubscriber(std::function<void(void)> subscriber) {
|
||||
subscriber_ = std::move(subscriber);
|
||||
@ -124,40 +129,35 @@ class TaskTable : public interface::dumpable {
|
||||
void
|
||||
Put(std::vector<TaskPtr>& tasks);
|
||||
|
||||
/*
|
||||
* Return task table item reference;
|
||||
*/
|
||||
TaskTableItemPtr
|
||||
Get(uint64_t index);
|
||||
|
||||
inline size_t
|
||||
Capacity() {
|
||||
return table_.capacity();
|
||||
}
|
||||
|
||||
/*
|
||||
* Return size of task table;
|
||||
*/
|
||||
inline size_t
|
||||
Size() {
|
||||
return table_.size();
|
||||
}
|
||||
|
||||
size_t
|
||||
TaskToExecute();
|
||||
|
||||
public:
|
||||
const TaskTableItemPtr& operator[](uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
public:
|
||||
std::vector<uint64_t>
|
||||
PickToLoad(uint64_t limit);
|
||||
|
||||
std::vector<uint64_t>
|
||||
PickToExecute(uint64_t limit);
|
||||
|
||||
public:
|
||||
inline const TaskTableItemPtr& operator[](uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
inline const TaskTableItemPtr&
|
||||
at(uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
|
||||
inline size_t
|
||||
capacity() {
|
||||
return table_.capacity();
|
||||
}
|
||||
|
||||
inline size_t
|
||||
size() {
|
||||
return table_.size();
|
||||
}
|
||||
|
||||
public:
|
||||
/******** Action ********/
|
||||
|
||||
@ -223,13 +223,6 @@ class TaskTable : public interface::dumpable {
|
||||
return table_[index]->Moved();
|
||||
}
|
||||
|
||||
public:
|
||||
/*
|
||||
* Dump;
|
||||
*/
|
||||
json
|
||||
Dump() const override;
|
||||
|
||||
private:
|
||||
std::uint64_t id_ = 0;
|
||||
CircleQueue<TaskTableItemPtr> table_;
|
||||
|
@ -132,7 +132,7 @@ Resource::pick_task_load() {
|
||||
for (auto index : indexes) {
|
||||
// try to set one task loading, then return
|
||||
if (task_table_.Load(index))
|
||||
return task_table_.Get(index);
|
||||
return task_table_.at(index);
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
@ -150,7 +150,7 @@ Resource::pick_task_execute() {
|
||||
}
|
||||
|
||||
if (task_table_.Execute(index)) {
|
||||
return task_table_.Get(index);
|
||||
return task_table_.at(index);
|
||||
}
|
||||
// if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
|
||||
// if (task_table_.Get(index)->task->path().Current() == task_table_.Get(index)->task->path().Last()
|
||||
|
@ -165,7 +165,7 @@ TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
|
||||
}
|
||||
|
||||
sleep(3);
|
||||
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
|
||||
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().size(), NUM);
|
||||
}
|
||||
|
||||
TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
|
||||
|
@ -183,19 +183,19 @@ TEST_F(TaskTableBaseTest, SUBSCRIBER) {
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_TASK) {
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.at(0)->task, task1_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
|
||||
empty_table_.Put(invalid_task_);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_);
|
||||
ASSERT_EQ(empty_table_.at(0)->task, invalid_task_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_BATCH) {
|
||||
std::vector<milvus::scheduler::TaskPtr> tasks{task1_, task2_};
|
||||
empty_table_.Put(tasks);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.Get(1)->task, task2_);
|
||||
ASSERT_EQ(empty_table_.at(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.at(1)->task, task2_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
|
||||
@ -204,14 +204,14 @@ TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, SIZE) {
|
||||
ASSERT_EQ(empty_table_.Size(), 0);
|
||||
ASSERT_EQ(empty_table_.size(), 0);
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.Size(), 1);
|
||||
ASSERT_EQ(empty_table_.size(), 1);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, OPERATOR) {
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.Get(0), empty_table_[0]);
|
||||
ASSERT_EQ(empty_table_.at(0), empty_table_[0]);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {
|
||||
@ -224,7 +224,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {
|
||||
|
||||
auto indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
|
||||
@ -237,9 +237,9 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
|
||||
|
||||
auto indexes = empty_table_.PickToLoad(3);
|
||||
ASSERT_EQ(indexes.size(), 3);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
|
||||
ASSERT_EQ(indexes[2] % empty_table_.Capacity(), 4);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3);
|
||||
ASSERT_EQ(indexes[2] % empty_table_.capacity(), 4);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
|
||||
@ -253,14 +253,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
|
||||
// first pick, non-cache
|
||||
auto indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
|
||||
// second pick, iterate from 2
|
||||
// invalid state change
|
||||
empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
|
||||
indexes = empty_table_.PickToLoad(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
|
||||
@ -274,7 +274,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
|
||||
|
||||
auto indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
|
||||
@ -289,8 +289,8 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
|
||||
|
||||
auto indexes = empty_table_.PickToExecute(3);
|
||||
ASSERT_EQ(indexes.size(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
|
||||
@ -305,14 +305,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
|
||||
// first pick, non-cache
|
||||
auto indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
|
||||
// second pick, iterate from 2
|
||||
// invalid state change
|
||||
empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
|
||||
indexes = empty_table_.PickToExecute(1);
|
||||
ASSERT_EQ(indexes.size(), 1);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
|
||||
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
|
||||
}
|
||||
|
||||
/************ TaskTableAdvanceTest ************/
|
||||
@ -328,14 +328,14 @@ class TaskTableAdvanceTest : public ::testing::Test {
|
||||
table1_.Put(task);
|
||||
}
|
||||
|
||||
table1_.Get(0)->state = milvus::scheduler::TaskTableItemState::INVALID;
|
||||
table1_.Get(1)->state = milvus::scheduler::TaskTableItemState::START;
|
||||
table1_.Get(2)->state = milvus::scheduler::TaskTableItemState::LOADING;
|
||||
table1_.Get(3)->state = milvus::scheduler::TaskTableItemState::LOADED;
|
||||
table1_.Get(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING;
|
||||
table1_.Get(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED;
|
||||
table1_.Get(6)->state = milvus::scheduler::TaskTableItemState::MOVING;
|
||||
table1_.Get(7)->state = milvus::scheduler::TaskTableItemState::MOVED;
|
||||
table1_.at(0)->state = milvus::scheduler::TaskTableItemState::INVALID;
|
||||
table1_.at(1)->state = milvus::scheduler::TaskTableItemState::START;
|
||||
table1_.at(2)->state = milvus::scheduler::TaskTableItemState::LOADING;
|
||||
table1_.at(3)->state = milvus::scheduler::TaskTableItemState::LOADED;
|
||||
table1_.at(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING;
|
||||
table1_.at(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED;
|
||||
table1_.at(6)->state = milvus::scheduler::TaskTableItemState::MOVING;
|
||||
table1_.at(7)->state = milvus::scheduler::TaskTableItemState::MOVED;
|
||||
}
|
||||
|
||||
milvus::scheduler::TaskTable table1_;
|
||||
@ -343,114 +343,114 @@ class TaskTableAdvanceTest : public ::testing::Test {
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, LOAD) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Load(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::START) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADING);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADING);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, LOADED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Loaded(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADING) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADED);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADED);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, EXECUTE) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Execute(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, EXECUTED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Executed(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::EXECUTING) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, MOVE) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Move(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVING);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVING);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, MOVED) {
|
||||
std::vector<milvus::scheduler::TaskTableItemState> before_state;
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
before_state.push_back(table1_[i]->state);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
table1_.Moved(i);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < table1_.Size(); ++i) {
|
||||
for (size_t i = 0; i < table1_.size(); ++i) {
|
||||
if (before_state[i] == milvus::scheduler::TaskTableItemState::MOVING) {
|
||||
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVED);
|
||||
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVED);
|
||||
} else {
|
||||
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
|
||||
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user