diff --git a/hikyuu_cpp/hikyuu/utilities/thread/ThreadPool.h b/hikyuu_cpp/hikyuu/utilities/thread/ThreadPool.h index 572d72e8..a8765be0 100644 --- a/hikyuu_cpp/hikyuu/utilities/thread/ThreadPool.h +++ b/hikyuu_cpp/hikyuu/utilities/thread/ThreadPool.h @@ -80,6 +80,7 @@ public: } else { m_master_work_queue.push(std::move(task)); } + m_cv.notify_one(); return res; } @@ -88,6 +89,7 @@ public: */ void stop() { m_done = true; + m_cv.notify_all(); // 唤醒所有工作线程 for (size_t i = 0; i < m_worker_num; i++) { if (m_threads[i].joinable()) { m_threads[i].join(); @@ -105,6 +107,9 @@ public: m_master_work_queue.push(std::move(FuncWrapper())); } + // 唤醒所有工作线程 + m_cv.notify_all(); + // 等待线程结束 for (size_t i = 0; i < m_worker_num; i++) { if (m_threads[i].joinable()) { @@ -117,9 +122,11 @@ public: private: typedef FuncWrapper task_type; - std::atomic_bool m_done; // 线程池全局需终止指示 - bool m_init_finished; // 线程池是否初始化完毕 - size_t m_worker_num; // 工作线程数量 + std::atomic_bool m_done; // 线程池全局需终止指示 + bool m_init_finished; // 线程池是否初始化完毕 + size_t m_worker_num; // 工作线程数量 + std::condition_variable m_cv; // 信号量,无任务时阻塞线程并等待 + std::mutex m_cv_mutex; // 配合信号量的互斥量 ThreadSafeQueue m_master_work_queue; // 主线程任务队列 std::vector > m_queues; // 任务队列(每个工作线程一个) @@ -127,8 +134,8 @@ private: // 线程本地变量 inline static thread_local WorkStealQueue* m_local_work_queue = nullptr; // 本地任务队列 - inline static thread_local size_t m_index = 0; //在线程池中的序号 - inline static thread_local bool m_thread_need_stop = false; // 线程停止运行指示 + inline static thread_local size_t m_index = 0; //在线程池中的序号 + inline static thread_local bool m_thread_need_stop = false; // 线程停止运行指示 void worker_thread(size_t index) { m_thread_need_stop = false; @@ -145,16 +152,21 @@ private: task_type task; if (pop_task_from_local_queue(task)) { task(); + std::this_thread::yield(); } else if (pop_task_from_master_queue(task)) { if (!task.isNullTask()) { task(); + std::this_thread::yield(); } else { m_thread_need_stop = true; } } else if (pop_task_from_other_thread_queue(task)) { task(); - } else { std::this_thread::yield(); + } else { + // std::this_thread::yield(); + std::unique_lock lk(m_cv_mutex); + m_cv.wait(lk); } }