MS-346 add .cpp to solve compile error

Former-commit-id: 3db400a1bd346ca72e591fbd6b3557387f435b0c
This commit is contained in:
wxyu 2019-08-13 17:55:12 +08:00
parent 46a184f1e3
commit 7b83d2a62f
11 changed files with 274 additions and 85 deletions

View File

@ -6,6 +6,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-336 - Scheduler interface
- MS-344 - Add TaskTable Test
- MS-345 - Add Node Test
- MS-346 - Add some implementation of scheduler to solve compile error
## Bug

View File

@ -0,0 +1,36 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Cost.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::vector<uint64_t>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {
std::vector<uint64_t> indexes;
return indexes;
}
std::vector<uint64_t>
PickToLoad(const TaskTable &task_table, uint64_t limit) {
std::vector<uint64_t> indexes;
return indexes;
}
std::vector<uint64_t>
PickToExecute(const TaskTable &task_table, uint64_t limit) {
std::vector<uint64_t> indexes;
return indexes;
}
}
}
}

View File

@ -7,8 +7,10 @@
#include <vector>
#include "Task.h"
#include "TaskTable.h"
#include "CacheMgr.h"
namespace zilliz {
namespace milvus {
namespace engine {
@ -20,8 +22,8 @@ namespace engine {
* select tasks to move;
* call from scheduler;
*/
std::vector<TaskPtr>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {}
std::vector<uint64_t>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit);
/*
@ -29,16 +31,16 @@ PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit)
* call from resource;
* I DONT SURE NEED THIS;
*/
std::vector<TaskPtr>
PickToLoad(TaskTable task_table, uint64_t limit) {}
std::vector<uint64_t>
PickToLoad(const TaskTable &task_table, uint64_t limit);
/*
* select task to execute;
* call from resource;
* I DONT SURE NEED THIS;
*/
std::vector<TaskPtr>
PickToExecute(TaskTable task_table, uint64_t limit) {}
std::vector<uint64_t>
PickToExecute(const TaskTable &task_table, uint64_t limit);
}

View File

@ -0,0 +1,29 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ResourceFactory.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::shared_ptr<Resource>
ResourceFactory::Create(const std::string &name, const std::string &alias) {
if (name == "disk") {
return std::make_shared<CpuResource>(alias);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias);
} else if (name == "gpu") {
return std::make_shared<CpuResource>(alias);
} else {
return nullptr;
}
}
}
}
}

View File

@ -21,17 +21,7 @@ namespace engine {
class ResourceFactory {
public:
static std::shared_ptr<Resource>
Create(const std::string &name, const std::string &alias = "") {
if (name == "disk") {
return std::make_shared<CpuResource>(alias);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias);
} else if (name == "gpu") {
return std::make_shared<CpuResource>(alias);
} else {
return nullptr;
}
}
Create(const std::string &name, const std::string &alias = "");
};

View File

@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Scheduler.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
StartUpEvent::Process() {
}
void
FinishTaskEvent::Process() {
// for (nei : res->neighbours) {
// tasks = cost(nei->task_table(), nei->connection, limit = 3)
// res->task_table()->PutTasks(tasks);
// }
// res->WakeUpExec();
}
void
CopyCompletedEvent::Process() {
}
void
TaskTableUpdatedEvent::Process() {
}
void
Scheduler::Start() {
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
}
std::string
Scheduler::Dump() {
return std::string();
}
void
Scheduler::worker_function() {
while (running_) {
auto event = event_queue_.front();
event->Process();
}
}
}
}
}

View File

@ -10,6 +10,9 @@
#include <thread>
#include <queue>
#include "resource/Resource.h"
#include "ResourceMgr.h"
namespace zilliz {
namespace milvus {
@ -18,8 +21,7 @@ namespace engine {
class Event {
public:
explicit
Event(ResourceWPtr &resource)
: resource_(resource) {}
Event(ResourceWPtr &resource) : resource_(resource) {}
public:
virtual void
@ -34,8 +36,7 @@ using EventPtr = std::shared_ptr<Event>;
class StartUpEvent : public Event {
public:
explicit
StartUpEvent(ResourceWPtr &resource)
: Event(resource) {}
StartUpEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
@ -45,25 +46,17 @@ public:
class FinishTaskEvent : public Event {
public:
explicit
FinishTaskEvent(ResourceWPtr &resource)
: Event(resource) {}
FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
Process() override {
// for (nei : res->neighbours) {
// tasks = cost(nei->task_table(), nei->connection, limit = 3)
// res->task_table()->PutTasks(tasks);
// }
// res->WakeUpExec();
}
Process() override;
};
class CopyCompletedEvent : public Event {
public:
explicit
CopyCompletedEvent(ResourceWPtr &resource)
: Event(resource) {}
CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
@ -73,8 +66,7 @@ public:
class TaskTableUpdatedEvent : public Event {
public:
explicit
TaskTableUpdatedEvent(ResourceWPtr &resource)
: Event(resource) {}
TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
@ -94,16 +86,16 @@ public:
}
void
Start() {}
Start();
public:
/******** Events ********/
/*
* Process start up events;
*/
void
inline void
OnStartUp(ResourceWPtr &resource) {
// call from res_mgr, non-blocking, if queue size over limit, exception!
auto event = std::make_shared<StartUpEvent>(resource);
event_queue_.push(event);
}
@ -111,20 +103,29 @@ public:
/*
* Process finish task events;
*/
void
OnFinishTask(ResourceWPtr);
inline void
OnFinishTask(ResourceWPtr &resource) {
auto event = std::make_shared<FinishTaskEvent>(resource);
event_queue_.push(event);
}
/*
* Process copy completed events;
*/
void
OnCopyCompleted(ResourceWPtr);
inline void
OnCopyCompleted(ResourceWPtr &resource) {
auto event = std::make_shared<CopyCompletedEvent>(resource);
event_queue_.push(event);
}
/*
* Process task table updated events;
*/
void
OnTaskTableUpdated(ResourceWPtr);
inline void
OnTaskTableUpdated(ResourceWPtr &resource) {
auto event = std::make_shared<TaskTableUpdatedEvent>(resource);
event_queue_.push(event);
}
public:
@ -133,13 +134,11 @@ public:
private:
/*
* Called by worker_thread_;
*/
void
worker_function() {
while (running_) {
auto event = event_queue_.front();
event->Process();
}
}
worker_function();
private:
bool running_;

View File

@ -0,0 +1,89 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "TaskTable.h"
#include <vector>
namespace zilliz {
namespace milvus {
namespace engine {
TaskTable::TaskTable(std::vector<TaskPtr> &&tasks) {
}
void
TaskTable::Put(TaskPtr task) {
}
void
TaskTable::Put(std::vector<TaskPtr> &tasks) {
}
TaskTableItem &
TaskTable::Get(uint64_t index) {
return table_[index];
}
void
TaskTable::Clear() {
// find first task is NOT (done or moved), erase from begin to it;
// auto iterator = table_.begin();
// while (iterator->state == TaskTableItemState::EXECUTED or
// iterator->state == TaskTableItemState::MOVED)
// iterator++;
// table_.erase(table_.begin(), iterator);
}
bool
TaskTable::Move(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task.mutex);
if (task.state == TaskTableItemState::START) {
task.state = TaskTableItemState::LOADING;
return true;
}
return false;
}
bool
TaskTable::Moved(uint64_t index) {
return false;
}
bool
TaskTable::Load(uint64_t index) {
return false;
}
bool
TaskTable::Loaded(uint64_t index) {
return false;
}
bool
TaskTable::Execute(uint64_t index) {
return false;
}
bool
TaskTable::Executed(uint64_t index) {
return false;
}
std::string
TaskTable::Dump() {
return std::string();
}
}
}
}

View File

@ -47,26 +47,26 @@ public:
TaskTable() = default;
explicit
TaskTable(std::vector<TaskPtr> &&tasks) {}
TaskTable(std::vector<TaskPtr> &&tasks);
/*
* Put one task;
*/
void
Put(TaskPtr task) {}
Put(TaskPtr task);
/*
* Put tasks back of task table;
* Called by DBImpl;
*/
void
Put(std::vector<TaskPtr> &tasks) {}
Put(std::vector<TaskPtr> &tasks);
/*
* Return task table item reference;
*/
TaskTableItem &
Get(uint64_t index) {}
Get(uint64_t index);
/*
* TODO
@ -74,14 +74,7 @@ public:
* Called by ?
*/
void
Clear() {
// find first task is NOT (done or moved), erase from begin to it;
// auto iterator = table_.begin();
// while (iterator->state == TaskTableItemState::EXECUTED or
// iterator->state == TaskTableItemState::MOVED)
// iterator++;
// table_.erase(table_.begin(), iterator);
}
Clear();
public:
@ -95,16 +88,7 @@ public:
// TODO: bool to Status
bool
Move(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task.mutex);
if (task.state == TaskTableItemState::START) {
task.state = TaskTableItemState::LOADING;
return true;
}
return false;
}
Move(uint64_t index);
/*
* Move task finished;
@ -112,7 +96,7 @@ public:
* Called by scheduler;
*/
bool
Moved(uint64_t index) {}
Moved(uint64_t index);
/*
* Load a task;
@ -120,7 +104,7 @@ public:
* Called by loader;
*/
bool
Load(uint64_t index) {}
Load(uint64_t index);
/*
* Load task finished;
@ -128,7 +112,7 @@ public:
* Called by loader;
*/
bool
Loaded(uint64_t index) {}
Loaded(uint64_t index);
/*
* Execute a task;
@ -136,7 +120,7 @@ public:
* Called by executor;
*/
bool
Execute(uint64_t index) {}
Execute(uint64_t index);
/*
* Execute task finished;
@ -144,7 +128,7 @@ public:
* Called by executor;
*/
bool
Executed(uint64_t index) {}
Executed(uint64_t index);
public:
/*

View File

@ -143,11 +143,11 @@ private:
*/
TaskPtr
pick_task_load() {
auto tasks = PickToLoad(task_table_, 3);
for (uint64_t i = 0; i < tasks.size(); ++i) {
auto indexes = PickToLoad(task_table_, 3);
for (auto index : indexes) {
// try to set one task loading, then return
if (task_table_.Load(i))
return task_table_.Get(i).task;
if (task_table_.Load(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;
@ -159,11 +159,11 @@ private:
*/
TaskPtr
pick_task_execute() {
auto tasks = PickToExecute(task_table_, 3);
for (uint64_t i = 0; i < tasks.size(); ++i) {
auto indexes = PickToExecute(task_table_, 3);
for (auto index : indexes) {
// try to set one task executing, then return
if (task_table_.Execute(i))
return task_table_.Get(i).task;
if (task_table_.Execute(index))
return task_table_.Get(index).task;
// else try next
}
return nullptr;

View File

@ -21,7 +21,7 @@ TEST(normal_test, DISABLED_test1) {
res_mgr->Connect(cpu, gpu1, PCIE);
res_mgr->Connect(cpu, gpu2, PCIE);
res_mgr->StartAll();
res_mgr->Start();
auto task1 = std::make_shared<Task>("123456789");
auto task2 = std::make_shared<Task>("222222222");