update thread pool

This commit is contained in:
fasiondog 2019-09-25 02:25:04 +08:00
parent b2f3051fe4
commit e7899f6170
3 changed files with 45 additions and 10 deletions

View File

@ -22,11 +22,13 @@ namespace hku {
class ThreadPool {
public:
ThreadPool(): ThreadPool(std::thread::hardware_concurrency()) {}
ThreadPool(size_t n): m_done(false), m_init_finished(false), m_worker_num(n) {
ThreadPool(size_t n): m_done(false), m_init_finished(false), m_worker_num(n), m_current_index(0), m_count(0) {
try {
for (size_t i = 0; i < m_worker_num; i++) {
std::promise<bool *> promise;
m_queues.push_back(std::unique_ptr<WorkStealQueue>(new WorkStealQueue));
m_threads.push_back(std::thread(&ThreadPool::worker_thread, this, i));
m_threads.push_back(std::thread(&ThreadPool::worker_thread, this, i, std::ref(promise)));
m_finish_until_empty.push_back(promise.get_future().get());
}
} catch (...) {
m_done = true;
@ -59,14 +61,36 @@ public:
if (m_local_work_queue) {
m_local_work_queue->push(std::move(task));
} else {
m_pool_work_queue.push(std::move(task));
m_queues[m_current_index++]->push(std::move(task));
if (m_current_index >= m_worker_num) {
m_current_index = 0;
}
//m_pool_work_queue.push(std::move(task));
}
return res;
}
void run_pending_task() {
task_type task;
if (pop_task_from_local_queue(task) || pop_task_from_other_thread_queue(task)) {
if (pop_task_from_local_queue(task)
|| pop_task_from_pool_queue(task)
|| pop_task_from_other_thread_queue(task)) {
task();
} else {
//auto now = std::chrono::steady_clock::now();
if (m_local_finish_until_empty) {
//auto duration = std::chrono::duration<double, std::ratio<1, 1000>>(now - m_pre_time).count();
//if (duration >= 10) {
m_thread_need_stop = true;
//} else {
// std::this_thread::yield();
//}
} else {
//m_pre_time = now;
std::this_thread::yield();
}
}
/*if (pop_task_from_local_queue(task) || pop_task_from_other_thread_queue(task)) {
task();
} else if (pop_task_from_pool_queue(task)) {
if (task.is_stop_task()) {
@ -76,12 +100,13 @@ public:
}
} else {
std::this_thread::yield();
}
}*/
}
void join() {
for (size_t i = 0; i < m_worker_num; i++) {
m_pool_work_queue.push(FuncWrapper());
//m_pool_work_queue.push(FuncWrapper());
(*m_finish_until_empty[i]) = true;
}
for (size_t i = 0; i < m_worker_num; i++) {
@ -97,20 +122,28 @@ private:
bool m_init_finished;
size_t m_worker_num;
ThreadSafeQueue<task_type> m_pool_work_queue;
std::vector<bool *> m_finish_until_empty;
std::vector<std::unique_ptr<WorkStealQueue> > m_queues;
std::vector<std::thread> m_threads;
size_t m_current_index;
size_t m_count;
inline static thread_local WorkStealQueue* m_local_work_queue = nullptr;
inline static thread_local size_t m_index = 0;
inline static thread_local bool m_local_finish_until_empty = false;
inline static thread_local bool m_thread_need_stop = false;
inline static thread_local auto m_pre_time = std::chrono::steady_clock::now();
void worker_thread(size_t index) {
void worker_thread(size_t index, std::promise<bool *>& promise) {
m_thread_need_stop = false;
m_index = index;
m_local_work_queue = m_queues[m_index].get();
m_local_finish_until_empty = false;
promise.set_value(&m_local_finish_until_empty);
while (!m_thread_need_stop && !m_done) {
run_pending_task();
}
std::cout << fmt::format("finished!\n");
}
bool pop_task_from_local_queue(task_type& task) {
@ -127,6 +160,7 @@ private:
}
for (size_t i = 0; i < m_worker_num; ++i) {
size_t index = (m_index+i+1) % m_worker_num;
//std::cout << fmt::format("steal({})\n", m_count++);
if (m_queues[index]->try_steal(task)) {
return true;
}

View File

@ -31,7 +31,7 @@ public:
void push(data_type data) {
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push_front(std::move(data));
m_queue.push_back(std::move(data));
}
bool empty() const {

View File

@ -36,10 +36,11 @@ BOOST_AUTO_TEST_CASE( test_temp ) {
ThreadPool tg;
for (int i = 0; i < 40; i++) {
tg.submit([=]() {
int x = Fibon(i+1);
std::cout << fmt::format("{}---------------------\n", i);
/*int x = Fibon(i+1);
std::stringstream buf;
buf << i+1 << ": " << x << std::endl;
std::cout << buf.str();
std::cout << buf.str();*/
});
}
tg.join();