外包項(xiàng)目刷seo快速排名
文章目錄
- 前言
- Code
- ThreadPool.hpp
- main.cpp
- 簡(jiǎn)單講解
- 所需頭文件
- using
- 成員變量
- 構(gòu)造
- 析構(gòu)
- 添加任務(wù)
- PS
- 測(cè)試效果
- END
前言
線程池_百度百科 (baidu.com)
線程池是一種多線程處理形式,處理過程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線程后自動(dòng)啟動(dòng)這些任務(wù)。線程池線程都是后臺(tái)線程。每個(gè)線程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級(jí)運(yùn)行,并處于多線程單元中。如果某個(gè)線程在托管代碼中空閑(如正在等待某個(gè)事件),則線程池將插入另一個(gè)輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊(duì)列中包含掛起的工作,則線程池將在一段時(shí)間后創(chuàng)建另一個(gè)輔助線程但線程的數(shù)目永遠(yuǎn)不會(huì)超過最大值。超過最大值的線程可以排隊(duì),但他們要等到其他線程完成后才啟動(dòng)。
線程池主要有fixed模式
和cached模式
。
其中fixed模式實(shí)現(xiàn)起來比較簡(jiǎn)單。本文就是以此來編寫。
多線程基礎(chǔ)請(qǐng)看:(C++) 多線程之生產(chǎn)者消費(fèi)者問題_c++ 多線程 生產(chǎn)者消費(fèi)者_(dá)天賜細(xì)蓮的博客-CSDN博客
cached模式的線程池,可以參考程序喵達(dá)人的代碼:C++線程池的實(shí)現(xiàn) - 掘金 (juejin.cn)
Code
ThreadPool.hpp
#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
#include <vector>namespace lotus {
std::string get_threadID() {std::stringstream ss;ss << std::this_thread::get_id();return ss.str();
}class ThreadPool {
public:// 統(tǒng)一將任務(wù)bind為 void(*)() 的函數(shù)形式using Task_Type = std::function<void()>;private: // config// 后期根據(jù)這個(gè)將 fixed 模式改為 cached模式const size_t TASK_COUNT;private: // taskstd::vector<std::unique_ptr<std::thread>> m_threadControlList;std::queue<Task_Type> m_taskQueue;private: // thread helperstd::atomic<bool> m_canRun;std::condition_variable m_condVar;std::mutex m_mutex;public:/*** @brief Construct a new Thread Pool object* 盡量不要讓線程數(shù) > cpu內(nèi)核數(shù)* @param taskCnt*/ThreadPool(const size_t taskCnt) : TASK_COUNT(taskCnt) {m_canRun = (TASK_COUNT <= std::thread::hardware_concurrency());open_pool();}// copy prohibitedThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;/*** @brief Destroy the Thread Pool object*/~ThreadPool() {close_pool();}public:/*** @brief* 添加任務(wù),并讓條件變量通知一次* 目前不處理返回值* @tparam Fun* @tparam Args* @param fun* @param args*/template <typename Fun, typename... Args>void Add_task(Fun&& fun, Args&&... args) {std::lock_guard<std::mutex> lock(m_mutex);auto task =std::bind(std::forward<Fun>(fun), std::forward<Args>(args)...);m_taskQueue.push(std::move(task));m_condVar.notify_one();}private:/*** @brief Create a thread object*/void create_thread() {std::lock_guard<std::mutex> lock(m_mutex);auto createTaskThread = [this]() {for (;;) {std::unique_lock<std::mutex> lock(m_mutex);while (m_taskQueue.empty() && m_canRun) {m_condVar.wait(lock);}if (false == m_canRun) {break;}auto task = std::move(m_taskQueue.front());m_taskQueue.pop();lock.unlock();task();} // while 1};auto thPtr = std::make_unique<std::thread>(createTaskThread);m_threadControlList.emplace_back(std::move(thPtr));}private:/*** @brief* 創(chuàng)建一定數(shù)量的線程* @param taskCnt*/void open_pool() {for (size_t i = 0; i < TASK_COUNT; i += 1) {create_thread();}}/*** @brief* 運(yùn)行標(biāo)志改為 false* 并通知所有線程* 確保所有線程都join完*/void close_pool() {m_canRun = false;m_condVar.notify_all();for (auto&& thPtr : m_threadControlList) {if (thPtr->joinable()) {thPtr->join();}}}
}; // class} // namespace lotu
main.cpp
#include <iostream>#include "ThreadPool.hpp"
namespace my = lotus;#ifdef _MSC_VER
#define __FUNC_NAME__ __FUNCSIG__
#elif defined(__GNUC__) || defined(__clang__)
#define __FUNC_NAME__ __PRETTY_FUNCTION__
#else
#define __FUNC_NAME__ __func__
#endif/*** @brief* test fun* @param waitTime* @param str*/
void fun(int waitTime, const char* str) {const int N = 5;printf("[%s]time{%d}start\n", __FUNC_NAME__, waitTime);for (int i = 0; i < N; i += 1) {std::this_thread::sleep_for(std::chrono::seconds(waitTime));auto threadId = my::get_threadID();const char* idStr = threadId.c_str();printf("[%s]%s\n", idStr, str);}printf("[%s]time{%d}end\n", __FUNC_NAME__, waitTime);
}/*** @brief* main fun* @param argc* @param argv* @return int*/
int main(int argc, const char** argv) {srand(time(0));printf("[%s] start\n", __FUNC_NAME__);{my::ThreadPool pool(2);pool.Add_task(fun, 1, "11111111111111111111");pool.Add_task(fun, 2, "22222222222222222222");pool.Add_task(fun, 3, "33333333333333333333");system("pause");}printf("[%s] end\n", __FUNC_NAME__);// getchar();
}
簡(jiǎn)單講解
所需頭文件
#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
using
// 將所有任務(wù)bind成void(*)()的形式
using Task_Type = std::function<void()>;
成員變量
private: // config// 后期根據(jù)這個(gè)將 fixed 模式改為 cached模式const size_t TASK_COUNT;private: // taskstd::vector<std::unique_ptr<std::thread>> m_threadControlList;// 任務(wù)隊(duì)列std::queue<Task_Type> m_taskQueue;private: // thread helper// 終止標(biāo)志std::atomic<bool> m_canRun;// 條件變量std::condition_variable m_condVar;// 互斥量std::mutex m_mutex;
構(gòu)造
傳入fixed的線程數(shù)量, 盡量不要 > cpu核數(shù)。禁止拷貝操作。
public:// 傳入fixed的線程數(shù)量ThreadPool(const size_t taskCnt) : TASK_COUNT(taskCnt) {// 盡量不要 > cpu核數(shù)m_canRun = (TASK_COUNT <= std::thread::hardware_concurrency());open_pool();}// 禁止拷貝ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;
初始化TASK_COUNT
數(shù)量的線程。
每個(gè)線程寫死循環(huán),不斷等待和執(zhí)行任務(wù)隊(duì)列的任務(wù)。
基于條件變量std::condition_variable
來進(jìn)行阻塞。
private: void create_thread() {std::lock_guard<std::mutex> lock(m_mutex);auto createTaskThread = [this]() {for (;;) {std::unique_lock<std::mutex> lock(m_mutex);while (m_taskQueue.empty() && m_canRun) {m_condVar.wait(lock);}if (false == m_canRun) {break;}auto task = std::move(m_taskQueue.front());m_taskQueue.pop();lock.unlock();task();} // while 1};auto thPtr = std::make_unique<std::thread>(createTaskThread);m_threadControlList.emplace_back(std::move(thPtr));}private:void open_pool() {for (size_t i = 0; i < TASK_COUNT; i += 1) {create_thread();}}
析構(gòu)
public:~ThreadPool() {close_pool();}
將運(yùn)行標(biāo)志改為false,由于這里的標(biāo)志是std::atomic<>
。并且其余變量不涉及資源競(jìng)爭(zhēng),因此無需加鎖。
注意,這里使用join()
來保證每個(gè)線程是正常退出的。
private:void close_pool() {m_canRun = false;m_condVar.notify_all();for (auto&& thPtr : m_threadControlList) {if (thPtr->joinable()) {thPtr->join();}}}
添加任務(wù)
這里使用變參模板技術(shù),將任務(wù)包裝成std::function<void()>
的一個(gè)可調(diào)用對(duì)象。
注意這里傳入的是萬能引用
要使用std::forward<>
。
每加入一個(gè)任務(wù),就讓條件變量進(jìn)行一次通知notify_one()
。
public:template <typename Fun, typename... Args>void Add_task(Fun&& fun, Args&&... args) {std::lock_guard<std::mutex> lock(m_mutex);auto task =std::bind(std::forward<Fun>(fun), std::forward<Args>(args)...);m_taskQueue.push(std::move(task));m_condVar.notify_one();}
PS
由于std::this_thread::get_id()
返回的是thread::id
。對(duì)于cpp可以使用std::cout來進(jìn)行輸出。但是并沒有對(duì)應(yīng)的類型轉(zhuǎn)化操作。且,id的大小和數(shù)值類型在不同平臺(tái),不同編譯器中不一致。更不能用于printf
雖然有的編譯器支持。
因此用std::stringstream
流進(jìn)行一次間接的轉(zhuǎn)化。
std::string get_threadID() {std::stringstream ss;ss << std::this_thread::get_id();return ss.str();
}
測(cè)試效果
g++ (x86_64-posix-seh-rev0, Built by MinGW-W64 project) 7.3.0
[int main(int, const char**)] start
[void fun(int, const char*)]time{1}start
[void fun(int, const char*)]time{2}start
請(qǐng)按任意鍵繼續(xù). . . [2]11111111111111111111
[3]22222222222222222222
[2]11111111111111111111
[2]11111111111111111111
[3]22222222222222222222
[2]11111111111111111111
[2]11111111111111111111
[void fun(int, const char*)]time{1}end
[void fun(int, const char*)]time{3}start
[3]22222222222222222222
[3]22222222222222222222
[2]33333333333333333333
[3]22222222222222222222
[void fun(int, const char*)]time{2}end
[2]33333333333333333333
[2]33333333333333333333
[2]33333333333333333333
[2]33333333333333333333
[void fun(int, const char*)]time{3}end[int main(int, const char**)] end
最后的空行是,對(duì)應(yīng)system("pause")
。由于在線程池的析構(gòu)中使用了join()
,因此提前鍵入也可以。