优化线程池

This commit is contained in:
fasiondog 2020-04-01 00:48:20 +08:00
parent 1b2ea289bb
commit ff43b318cd

View File

@ -80,6 +80,7 @@ public:
} else { } else {
m_master_work_queue.push(std::move(task)); m_master_work_queue.push(std::move(task));
} }
m_cv.notify_one();
return res; return res;
} }
@ -88,6 +89,7 @@ public:
*/ */
void stop() { void stop() {
m_done = true; m_done = true;
m_cv.notify_all(); // 唤醒所有工作线程
for (size_t i = 0; i < m_worker_num; i++) { for (size_t i = 0; i < m_worker_num; i++) {
if (m_threads[i].joinable()) { if (m_threads[i].joinable()) {
m_threads[i].join(); m_threads[i].join();
@ -105,6 +107,9 @@ public:
m_master_work_queue.push(std::move(FuncWrapper())); m_master_work_queue.push(std::move(FuncWrapper()));
} }
// 唤醒所有工作线程
m_cv.notify_all();
// 等待线程结束 // 等待线程结束
for (size_t i = 0; i < m_worker_num; i++) { for (size_t i = 0; i < m_worker_num; i++) {
if (m_threads[i].joinable()) { if (m_threads[i].joinable()) {
@ -117,9 +122,11 @@ public:
private: private:
typedef FuncWrapper task_type; typedef FuncWrapper task_type;
std::atomic_bool m_done; // 线程池全局需终止指示 std::atomic_bool m_done; // 线程池全局需终止指示
bool m_init_finished; // 线程池是否初始化完毕 bool m_init_finished; // 线程池是否初始化完毕
size_t m_worker_num; // 工作线程数量 size_t m_worker_num; // 工作线程数量
std::condition_variable m_cv; // 信号量,无任务时阻塞线程并等待
std::mutex m_cv_mutex; // 配合信号量的互斥量
ThreadSafeQueue<task_type> m_master_work_queue; // 主线程任务队列 ThreadSafeQueue<task_type> m_master_work_queue; // 主线程任务队列
std::vector<std::unique_ptr<WorkStealQueue> > m_queues; // 任务队列(每个工作线程一个) std::vector<std::unique_ptr<WorkStealQueue> > m_queues; // 任务队列(每个工作线程一个)
@ -127,8 +134,8 @@ private:
// 线程本地变量 // 线程本地变量
inline static thread_local WorkStealQueue* m_local_work_queue = nullptr; // 本地任务队列 inline static thread_local WorkStealQueue* m_local_work_queue = nullptr; // 本地任务队列
inline static thread_local size_t m_index = 0; //在线程池中的序号 inline static thread_local size_t m_index = 0; //在线程池中的序号
inline static thread_local bool m_thread_need_stop = false; // 线程停止运行指示 inline static thread_local bool m_thread_need_stop = false; // 线程停止运行指示
void worker_thread(size_t index) { void worker_thread(size_t index) {
m_thread_need_stop = false; m_thread_need_stop = false;
@ -145,16 +152,21 @@ private:
task_type task; task_type task;
if (pop_task_from_local_queue(task)) { if (pop_task_from_local_queue(task)) {
task(); task();
std::this_thread::yield();
} else if (pop_task_from_master_queue(task)) { } else if (pop_task_from_master_queue(task)) {
if (!task.isNullTask()) { if (!task.isNullTask()) {
task(); task();
std::this_thread::yield();
} else { } else {
m_thread_need_stop = true; m_thread_need_stop = true;
} }
} else if (pop_task_from_other_thread_queue(task)) { } else if (pop_task_from_other_thread_queue(task)) {
task(); task();
} else {
std::this_thread::yield(); std::this_thread::yield();
} else {
// std::this_thread::yield();
std::unique_lock<std::mutex> lk(m_cv_mutex);
m_cv.wait(lk);
} }
} }