修正ThreadPool实现

This commit is contained in:
fasiondog 2021-07-01 01:19:07 +08:00
parent f04c8dbec3
commit 485fa879b4
5 changed files with 40 additions and 20 deletions

View File

@ -8,8 +8,6 @@
*/
#pragma once
#ifndef HIKYUU_UTILITIES_MQTHREAD_THREADPOOL_H
#define HIKYUU_UTILITIES_MQTHREAD_THREADPOOL_H
//#include <fmt/format.h>
#include <future>
@ -141,8 +139,15 @@ public:
// 指示各工作线程在未获取到工作任务时,停止运行
if (m_runnging_util_empty) {
for (size_t i = 0; i < m_worker_num; i++) {
m_queues[i]->push(FuncWrapper());
while (m_queues[i]->size() != 0) {
std::this_thread::yield();
}
}
m_done = true;
}
for (size_t i = 0; i < m_worker_num; i++) {
m_queues[i]->push(std::move(FuncWrapper()));
}
// 等待线程结束
@ -187,6 +192,7 @@ private:
if (m_local_work_queue->try_pop(task)) {
if (task.isNullTask()) {
m_thread_need_stop = true;
return;
} else {
task();
std::this_thread::yield();
@ -203,12 +209,6 @@ private:
std::this_thread::yield();
}
}
/*m_local_work_queue->wait_and_pop(task);
if (task.isNullTask()) {
m_thread_need_stop = true;
} else {
task();
}*/
}
bool pop_task_from_other_thread_queue(task_type& task) {
@ -229,5 +229,3 @@ private:
}; // namespace hku
} /* namespace hku */
#endif /* HIKYUU_UTILITIES_MQTHREAD_THREADPOOL_H */

View File

@ -8,8 +8,6 @@
*/
#pragma once
#ifndef HIKYUU_UTILITIES_MQTHREAD_THREADPOOL_H
#define HIKYUU_UTILITIES_MQTHREAD_THREADPOOL_H
//#include <fmt/format.h>
#include <future>
@ -136,8 +134,15 @@ public:
// 指示各工作线程在未获取到工作任务时,停止运行
if (m_runnging_util_empty) {
for (size_t i = 0; i < m_worker_num; i++) {
m_queues[i]->push(std::move(FuncWrapper()));
while (m_queues[i]->size() != 0) {
std::this_thread::yield();
}
}
m_done = true;
}
for (size_t i = 0; i < m_worker_num; i++) {
m_queues[i]->push(std::move(FuncWrapper()));
}
// 等待线程结束
@ -189,5 +194,3 @@ private:
}; // namespace hku
} /* namespace hku */
#endif /* HIKYUU_UTILITIES_MQTHREAD_THREADPOOL_H */

View File

@ -113,11 +113,14 @@ public:
}
m_cv.notify_all(); // 唤醒所有工作线程
for (size_t i = 0; i < m_worker_num; i++) {
if (m_threads[i].joinable()) {
m_threads[i].join();
}
}
// printf("Quit StealThreadPool\n");
}
/**
@ -127,9 +130,19 @@ public:
void join() {
// 指示各工作线程在未获取到工作任务时,停止运行
if (m_runnging_util_empty) {
for (size_t i = 0; i < m_worker_num; i++) {
m_master_work_queue.push(std::move(FuncWrapper()));
while (m_master_work_queue.size() != 0) {
std::this_thread::yield();
}
for (size_t i = 0; i < m_worker_num; i++) {
while (m_queues.size() != 0) {
std::this_thread::yield();
}
}
m_done = true;
}
for (size_t i = 0; i < m_worker_num; i++) {
m_master_work_queue.push(std::move(FuncWrapper()));
}
// 唤醒所有工作线程

View File

@ -119,8 +119,15 @@ public:
// 指示各工作线程在未获取到工作任务时,停止运行
if (m_runnging_util_empty) {
for (size_t i = 0; i < m_worker_num; i++) {
m_master_work_queue.push(std::move(FuncWrapper()));
while (m_master_work_queue.size() != 0) {
std::this_thread::yield();
}
}
m_done = true;
}
for (size_t i = 0; i < m_worker_num; i++) {
m_master_work_queue.push(std::move(FuncWrapper()));
}
// 等待线程结束

View File

@ -10,7 +10,6 @@
#include <string>
#include <unordered_set>
#include <hikyuu/utilities/thread/MQStealThreadPool.h>
#include <hikyuu/utilities/thread/ThreadPool.h>
#include <hikyuu/utilities/thread/FuncWrapper.h>
#include "HttpHandle.h"