* 本文修改后的代碼已上傳到GitHub網(wǎng)站Apollo項(xiàng)目中。
{ 1 }
線(xiàn)程池技術(shù)簡(jiǎn)介
1線(xiàn)程池的定義
線(xiàn)程池是一種多線(xiàn)程形式,首先開(kāi)啟指定數(shù)量的后臺(tái)工作線(xiàn)程,并將多個(gè)待執(zhí)行任務(wù)添加到任務(wù)隊(duì)列,然后將隊(duì)列中的任務(wù)逐個(gè)交給空閑的工作線(xiàn)程執(zhí)行(如下圖所示)。

2使用線(xiàn)程池的原因
創(chuàng)建/銷(xiāo)毀線(xiàn)程伴隨著操作系統(tǒng)的資源開(kāi)銷(xiāo),過(guò)于頻繁的創(chuàng)建/銷(xiāo)毀線(xiàn)程,會(huì)很大程度上影響處理效率。若創(chuàng)建線(xiàn)程消耗時(shí)間T1,執(zhí)行任務(wù)消耗時(shí)間T2,銷(xiāo)毀線(xiàn)程消耗時(shí)間T3,如果T1+T3>T2,則開(kāi)啟一個(gè)線(xiàn)程來(lái)執(zhí)行一個(gè)任務(wù)就很不劃算,而使用線(xiàn)程池緩存線(xiàn)程,就可利用已有的閑置線(xiàn)程來(lái)執(zhí)行新任務(wù),有效避免T1+T3帶來(lái)的系統(tǒng)開(kāi)銷(xiāo)。
線(xiàn)程并發(fā)數(shù)量過(guò)多,搶占系統(tǒng)資源從而導(dǎo)致阻塞。我們知道線(xiàn)程會(huì)共享系統(tǒng)資源,如果同時(shí)執(zhí)行的線(xiàn)程數(shù)量過(guò)多,可能會(huì)導(dǎo)致系統(tǒng)資源不足而產(chǎn)生操作卡頓甚至出現(xiàn)假死現(xiàn)象,運(yùn)用線(xiàn)程池能有效地控制線(xiàn)程最大并發(fā)數(shù),有效避免上述問(wèn)題。
對(duì)線(xiàn)程進(jìn)行一些簡(jiǎn)單的管理。比如:延時(shí)執(zhí)行、定時(shí)循環(huán)執(zhí)行等策略,運(yùn)用線(xiàn)程池就較容易實(shí)現(xiàn)。
3C++中如何使用線(xiàn)程池
C++標(biāo)準(zhǔn)庫(kù)不提供線(xiàn)程池,如需使用需自行撰寫(xiě)線(xiàn)程池類(lèi)。GitHub中有多個(gè)線(xiàn)程池類(lèi)的實(shí)現(xiàn),Apollo項(xiàng)目也參考了其中的一個(gè)實(shí)現(xiàn)【https://github.com/vit-vit/CTPL】。
{ 2 }
Apollo線(xiàn)程池類(lèi)源代碼分析
Apollo線(xiàn)程池文件位于[your_apollo_root_dir]/modules/common/util/ctpl_stl.h,包含任務(wù)隊(duì)列類(lèi)Queue和線(xiàn)程池類(lèi)ThreadPool,其中Queue位于命名空間apollo::common::util::detail內(nèi),ThreadPool位于命名空間apollo::common::util內(nèi)。
1任務(wù)隊(duì)列類(lèi)Queue
任務(wù)隊(duì)列類(lèi)Queue基于C++標(biāo)準(zhǔn)庫(kù)的隊(duì)列類(lèi)std::queue實(shí)現(xiàn),只是對(duì)push、pop和empty三個(gè)函數(shù)進(jìn)行了加鎖操作。
|
template class Queue { public: bool push(T const &value) { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); q_.push(value); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { // 使用std::lock_guard效率更高 std::unique_lock lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;}; |
根據(jù)這篇博客【https://blog.csdn.net/tgxallen/article/details/73522233】的介紹,可使用std::lock_guard和std::unique_lock提供RAII(資源獲取即初始化,Resource Acquisition Is Initialization,參見(jiàn)該網(wǎng)頁(yè)【https://blog.csdn.net/doc_sgl/article/details/43028009】)風(fēng)格的加鎖操作,其中std::lock_guard的系統(tǒng)開(kāi)銷(xiāo)更小,std::unique_lock更為靈活(可適時(shí)解鎖)。就我們的任務(wù)隊(duì)列類(lèi)Queue而言,不需要std::unique_lock提供的靈活性,因此使用std::lock_guard更為合適。另外,我還增加一個(gè)接受右值引用的push函數(shù),以方便下文中的ThreadPool使用,修改后的類(lèi)如下:
|
class Queue { public: bool push(const T &value) { std::lock_guard lock(mutex_); q_.push(value); return true; } // 增加一個(gè)接受右值引用的push函數(shù) bool push(T &&value) { std::lock_guard lock(mutex_); q_.emplace(std::move(value)); return true; } // deletes the retrieved element, do not use for non integral types bool pop(T &v) { // NOLINT std::lock_guard lock(mutex_); if (q_.empty()) { return false; } v = q_.front(); q_.pop(); return true; } bool empty() { std::lock_guard lock(mutex_); return q_.empty(); } private: std::queue q_; std::mutex mutex_;}; |
2線(xiàn)程池類(lèi)ThreadPool
線(xiàn)程池類(lèi)ThreadPool的主要功能是創(chuàng)建n_threads個(gè)后臺(tái)工作線(xiàn)程,將任務(wù)函數(shù)f包裝成std::function的形式存入任務(wù)隊(duì)列q_,根據(jù)當(dāng)前工作線(xiàn)程空閑情況,適時(shí)從任務(wù)隊(duì)列q_中提取一個(gè)任務(wù)函數(shù)并執(zhí)行之。注意復(fù)制構(gòu)造函數(shù)ThreadPool(const ThreadPool &)、移動(dòng)構(gòu)造函數(shù)ThreadPool(ThreadPool &&)、復(fù)制運(yùn)算符ThreadPool &operator=(const ThreadPool &)、移動(dòng)運(yùn)算符ThreadPool &operator=(ThreadPool &&)全部設(shè)置為private,表明禁止使用這些函數(shù)。其實(shí)C++11標(biāo)準(zhǔn)完成可以通過(guò)在函數(shù)聲明后加上= delete;的方式來(lái)禁用,源代碼以注釋的方式給出了這種實(shí)現(xiàn)方式。
下面分析該類(lèi)中幾個(gè)比較重要的成員函數(shù)。
2.2.1 Push函數(shù)
Push函數(shù)的作用是將任務(wù)函數(shù)f包裝成std::function的形式存入任務(wù)隊(duì)列q_。Push函數(shù)有兩個(gè)版本,一個(gè)允許任務(wù)函數(shù)f帶可變參數(shù)Rest &&... rest,一個(gè)不允許任務(wù)函數(shù)f帶額外參數(shù),函數(shù)體內(nèi)部代碼大同小異,下面以帶可變參數(shù)的版本進(jìn)行分析,代碼如下:
|
template auto Push(F &&f, Rest &&... rest) -> std::future { // std::placeholders::_1表示通過(guò)std::bind函數(shù)綁定后得到的異步任務(wù)對(duì)象接受的第一個(gè)參數(shù)是自由參數(shù) auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); // 最好使用std::make_shared創(chuàng)建智能指針對(duì)象,后面不用操心指針內(nèi)存的釋放 // auto _f = std::make_shared>([pck](int id) { (*pck)(id); }); auto _f = new std::function([pck](int id) { (*pck)(id); }); q_.push(_f); // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_one(); return pck->get_future();} |
Push函數(shù)的返回值為一個(gè)std::future對(duì)象,std::future對(duì)象內(nèi)存儲(chǔ)的數(shù)據(jù)類(lèi)型由(f(0, rest...)函數(shù)的返回值類(lèi)型確定,decltype(f(0, rest...))的作用就是獲取(f(0, rest...)函數(shù)的返回值類(lèi)型。std::future提供一種異步操作結(jié)果的訪(fǎng)問(wèn)機(jī)制,從字面意思來(lái)理解,它表示未來(lái),我覺(jué)得這個(gè)名字非常貼切,因?yàn)橐粋€(gè)異步操作的結(jié)果不可能馬上獲取,只能在未來(lái)某個(gè)時(shí)候得到。關(guān)于std::future,這篇博客【https://blog.csdn.net/yockie/article/details/50595958】講得挺不錯(cuò),大家可以借鑒。
因?yàn)槿蝿?wù)函數(shù)f的聲明各式各樣,有的不帶參數(shù),有的接受一個(gè)參數(shù),有的接受兩個(gè)參數(shù)……因此不能將其直接存儲(chǔ)到任務(wù)隊(duì)列q_,于是先利用std::bind函數(shù)將其包裝為一個(gè)異步操作任務(wù)std::packaged_task對(duì)象pck(接受一個(gè)整型參數(shù),返回值類(lèi)型為(f(0, rest...)函數(shù)的返回值類(lèi)型),再利用Lambda表達(dá)式將pck包裝為一個(gè)std::function對(duì)象,這樣就可以存儲(chǔ)到任務(wù)隊(duì)列q_中了。這里原作者直接使用new運(yùn)算符創(chuàng)建裸指針_f,后面還需想辦法釋放指針內(nèi)存,我認(rèn)為不是很合適,使用std::make_shared創(chuàng)建智能指針可以自動(dòng)管理內(nèi)存,更加省事,但使用std::shared_ptr>智能指針就不能使用Queue::push(const T &value)版本將其存儲(chǔ)到任務(wù)隊(duì)列,為此我在Queue類(lèi)中添加了一個(gè)接受右值引用參數(shù)的版本Queue::push(T &&value),使用該版本就可以順利將智能指針存儲(chǔ)進(jìn)去了。
接下來(lái),使用條件變量std::condition_variable對(duì)象cv_.notify_one()函數(shù)通知各個(gè)線(xiàn)程任務(wù)隊(duì)列已經(jīng)發(fā)生了改變,讓空閑線(xiàn)程趕緊從任務(wù)隊(duì)列中拉取新任務(wù)執(zhí)行;最后通過(guò)pck->get_future()返回一個(gè)std::future對(duì)象,以便調(diào)用者能從中取出函數(shù)執(zhí)行完畢后的返回值。
我看過(guò)很多C++多線(xiàn)程方面的書(shū)籍(”C++ Concurrency in Action”比較經(jīng)典),一般不對(duì)cv_.notify_one();進(jìn)行加鎖操作,因?yàn)檫@樣做除了降低效率外,還很容易引起死鎖,故需去除加鎖操作,具體原因參見(jiàn)該網(wǎng)頁(yè)【https://stackoverflow.com/questions/17101922/do-i-have-to-acquire-lock-before-calling-condition-variable-notify-one】以及另一個(gè)網(wǎng)頁(yè)【http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one】。
以下是修改后的版本:
|
template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future();} |
2.2.2Pop函數(shù)
Pop函數(shù)的作用是從任務(wù)隊(duì)列q_中取出并返回一個(gè)任務(wù),代碼如下:
|
std::function Pop() { std::function *_f = nullptr; q_.pop(_f); // 如果任務(wù)隊(duì)列q_中存儲(chǔ)的是智能指針,就不必使用這種小花招來(lái)釋放內(nèi)存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception occurred std::function f; if (_f) f = *_f; return f;} |
首先,從從任務(wù)隊(duì)列q_中取出一個(gè)任務(wù)函數(shù)對(duì)象的裸指針_f,若非空,則將其賦值給std::function f并返回。該函數(shù)里使用一個(gè)小花招,即創(chuàng)建一個(gè)智能指針std::unique_ptr> func(_f),當(dāng)超出該對(duì)象的作用域時(shí),就會(huì)在其析構(gòu)函數(shù)中調(diào)用delete運(yùn)算符釋放內(nèi)存。如果任務(wù)隊(duì)列q_中存儲(chǔ)的是智能指針,就不必使用這種小花招來(lái)釋放內(nèi)存了。
以下是修改后的版本:
|
std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f;} |
2.2.3Stop函數(shù)
Stop函數(shù)的作用停止線(xiàn)程池工作,若不允許等待,則直接停止當(dāng)前正在執(zhí)行的工作線(xiàn)程,同時(shí)清空任務(wù)隊(duì)列;若允許等待,則等待當(dāng)前正在執(zhí)行的工作線(xiàn)程完成,代碼如下:
|
void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?{ ? ? ?// 這里不要加鎖,否則易引起死鎖 ? ? ? ?std::unique_lock lock(mutex_); cv_.notify_all(); // stop all waiting threads } for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();} |
函數(shù)中的布爾變量is_stop_、is_done_、flags_[i]為什么都不用加鎖呢?這是因?yàn)樗鼈兌际窃宇?lèi)型std::atomic,所謂原子類(lèi)型就是一條CPU指令就能完成取值或?qū)懼挡僮鞯淖兞款?lèi)型。C++標(biāo)準(zhǔn)可保證std::atomic類(lèi)型變量在任何架構(gòu)操作系統(tǒng)中均只使用一條CPU指令就可完成取值或?qū)懼挡僮?,其他形如std::atomic的類(lèi)型,雖然將其聲明為原子類(lèi)型,但在某些架構(gòu)操作系統(tǒng)中,并不能只使用一條CPU指令完成取值或?qū)懼挡僮?。綜上所述,std::atomic類(lèi)型的變量可以在多線(xiàn)程中不加鎖操作。
根據(jù)2.2.1節(jié)的分析,cv_.notify_all();的加鎖操作應(yīng)去除。
修改后的代碼如下:
|
void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear();} |
2.2.4ClearQueue函數(shù)
ClearQueue函數(shù)的作用是清空任務(wù)隊(duì)列q_,代碼如下:
|
void ClearQueue() { std::function *_f; // empty the queue while (q_.pop(_f)) { delete _f; }} |
使用while循環(huán)從任務(wù)隊(duì)列q_中逐個(gè)彈出任務(wù)函數(shù)指針_f,因?yàn)開(kāi)f使用new運(yùn)算符創(chuàng)建,故需使用delete運(yùn)算符刪除以釋放內(nèi)存。如果任務(wù)隊(duì)列q_中存儲(chǔ)的是智能指針,就不必手工刪除對(duì)象來(lái)釋放內(nèi)存了。
以下是使用智能指針的版本:
|
void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } } |
2.2.5Resize函數(shù)
Resize函數(shù)的作用是更改線(xiàn)程池內(nèi)工作線(xiàn)程的數(shù)量,代碼如下:
|
void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } { // stop the detached threads that were waiting // 這里不要加鎖,否則易引起死鎖 std::unique_lock lock(mutex_); cv_.notify_all(); } // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }} |
如果兩個(gè)變量is_stop_、is_done_都不為真,表明線(xiàn)程池仍在使用,可以更改線(xiàn)程池內(nèi)工作線(xiàn)程的數(shù)量,否則沒(méi)必要對(duì)一個(gè)停用的線(xiàn)程池更改工作線(xiàn)程的數(shù)量。若新線(xiàn)程數(shù)n_threads大于當(dāng)前的工作線(xiàn)程數(shù)old_n_threads,則將工作線(xiàn)程數(shù)組threads_和線(xiàn)程標(biāo)志數(shù)組flags_的尺寸修改為新數(shù)目,同時(shí)使用for循環(huán)調(diào)用SetThread(i)函數(shù)逐個(gè)重新創(chuàng)建工作線(xiàn)程;若新線(xiàn)程數(shù)n_threads小于當(dāng)前的工作線(xiàn)程數(shù)old_n_threads,則將先完成old_n_threads - n_threads個(gè)線(xiàn)程正在執(zhí)行的任務(wù),之后將工作線(xiàn)程數(shù)組threads_和線(xiàn)程標(biāo)志數(shù)組flags_的尺寸修改為新數(shù)目。
根據(jù)2.2.1節(jié)的分析,cv_.notify_all();的加鎖操作應(yīng)去除,具體原因參見(jiàn)該網(wǎng)頁(yè)
注意:Resize函數(shù)很危險(xiǎn),應(yīng)盡量少調(diào)用,若必須調(diào)用,則應(yīng)當(dāng)在創(chuàng)建線(xiàn)程池的那個(gè)線(xiàn)程內(nèi)調(diào)用,而不要在其他線(xiàn)程中調(diào)用。
修改的代碼如下:
|
void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } }} |
2.2.6SetThread函數(shù)
SetThread函數(shù)的作用重新創(chuàng)建指定序號(hào)i的工作線(xiàn)程,代碼如下:
|
void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::function *_f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue // 如果任務(wù)隊(duì)列q_中存儲(chǔ)的是智能指針,就不必使用這種小花招來(lái)釋放內(nèi)存了。 std::unique_ptr> func( _f); // at return, delete the function even if an exception // occurred // 執(zhí)行任務(wù)函數(shù) (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command // 這里必須使用std::unique_lock,因?yàn)楹竺鏃l件變量cv_等待期間,需要解鎖。 std::unique_lock lock(mutex_); ++n_waiting_; // 等待任務(wù)隊(duì)列傳來(lái)的新任務(wù) cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() } |
上述代碼看起來(lái)比較復(fù)雜,實(shí)際上只有三條語(yǔ)句,第一條是std::shared_ptr> flag(flags_[i]);,即使用flags_[i]來(lái)初始化標(biāo)志變量flag;第二條看起來(lái)很長(zhǎng),實(shí)際上就是創(chuàng)建一個(gè)Lambda表達(dá)式變量f;第三條是threads_[i].reset(new std::thread(f));,使用Lambda表達(dá)式變量f作為工作線(xiàn)程的任務(wù)函數(shù),創(chuàng)建序號(hào)為i的工作線(xiàn)程。
那么Lambda表達(dá)式變量f何時(shí)啟動(dòng)呢?當(dāng)任務(wù)隊(duì)列q_.pop(_f)的返回值為true時(shí),表明從任務(wù)隊(duì)列q_中取到了一個(gè)新任務(wù),于是調(diào)用(*_f)(i);執(zhí)行之,如果當(dāng)前任務(wù)隊(duì)列沒(méi)有任務(wù),則使用:
|
cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag;}); |
等待新任務(wù)的到來(lái),在新任務(wù)到來(lái)之前,當(dāng)前工作線(xiàn)程處于休眠狀態(tài)。
該函數(shù)同樣使用一個(gè)小花招,即創(chuàng)建一個(gè)智能指針std::unique_ptr> func(_f),當(dāng)超出該對(duì)象的作用域時(shí),就會(huì)在其析構(gòu)函數(shù)中調(diào)用delete運(yùn)算符釋放內(nèi)存。如果任務(wù)隊(duì)列q_中存儲(chǔ)的是智能指針,就不必使用這種小花招來(lái)釋放內(nèi)存了。
2.2.7修改后的ThreadPool類(lèi)代碼
為完整起見(jiàn),這里給出修改后的ThreadPool類(lèi)代碼。
|
class ThreadPool { public: ThreadPool() { Init(); } explicit ThreadPool(int n_threads) { Init(); Resize(n_threads); } // the destructor waits for all the functions in the queue to be finished ~ThreadPool() { Stop(true); } // get the number of running threads in the pool int size() { return static_cast(threads_.size()); } // number of idle threads int NumIdle() { return n_waiting_; } std::thread &GetThread(const int i) { return *(threads_[i]); } // change the number of threads in the pool // should be called from one thread, otherwise be careful to not interleave, // also with stop() // n_threads must be >= 0 void Resize(const int n_threads) { if (!is_stop_ && !is_done_) { int old_n_threads = static_cast(threads_.size()); if (old_n_threads <= ? ? ? ? ?n_threads) { ?// if the number of threads is increased ? ? ? ?threads_.resize(n_threads); ? ? ? ?flags_.resize(n_threads); ? ? ? ?for (int i = old_n_threads; i < n_threads; ++i) { ? ? ? ? ?flags_[i] = std::make_shared>(false); SetThread(i); } } else { // the number of threads is decreased for (int i = old_n_threads - 1; i >= n_threads; --i) { *(flags_[i]) = true; // this thread will finish threads_[i]->detach(); } // stop the detached threads that were waiting cv_.notify_all(); // safe to delete because the threads are detached threads_.resize(n_threads); // safe to delete because the threads // have copies of shared_ptr of the // flags, not originals flags_.resize(n_threads); } } } // empty the queue void ClearQueue() { std::shared_ptr> f; // empty the queue while (q_.pop(f)) { // do nothing } } // pops a functional wrapper to the original function std::shared_ptr> Pop() { std::shared_ptr> f; q_.pop(f); return f; } // wait for all computing threads to finish and stop all threads // may be called asynchronously to not pause the calling thread while waiting // if is_wait == true, all the functions in the queue are run, otherwise the // queue is cleared without running the functions void Stop(bool is_wait = false) { if (!is_wait) { if (is_stop_) { return; } is_stop_ = true; for (int i = 0, n = size(); i < n; ++i) { ? ? ? ?*(flags_[i]) = true; ?// command the threads to stop ? ? ?} ? ? ?ClearQueue(); ?// empty the queue ? ?} else { ? ? ?if (is_done_ || is_stop_) return; ? ? ?is_done_ = true; ?// give the waiting threads a command to finish ? ?} ? ?cv_.notify_all(); ?// stop all waiting threads ? ?for (int i = 0; i < static_cast(threads_.size()); ++i) { // wait for the computing threads to finish if (threads_[i]->joinable()) { threads_[i]->join(); } } // if there were no threads in the pool but some functions in the queue, the // functions are not deleted by the threads // therefore delete them here ClearQueue(); threads_.clear(); flags_.clear(); } template auto Push(F &&f, Rest &&... rest) -> std::future { auto pck = std::make_shared>( std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } // run the user's function that excepts argument int - id of the running // thread. returned value is templatized // operator returns std::future, where the user can get the result and rethrow // the catched exceptins template auto Push(F &&f) -> std::future { auto pck = std::make_shared>( std::forward(f)); auto _f = std::make_shared>( [pck](int id) { (*pck)(id); }); // It is not necessary to lock q_ because it is locked in the Queue class. q_.push(std::move(_f)); cv_.notify_one(); return pck->get_future(); } private: // deleted ThreadPool(const ThreadPool &); // = delete; ThreadPool(ThreadPool &&); // = delete; ThreadPool &operator=(const ThreadPool &); // = delete; ThreadPool &operator=(ThreadPool &&); // = delete; void SetThread(int i) { std::shared_ptr> flag( flags_[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() { std::atomic &_flag = *flag; std::shared_ptr> _f; bool is_pop_ = q_.pop(_f); while (true) { while (is_pop_) { // if there is anything in the queue (*_f)(i); if (_flag) { // the thread is wanted to stop, return even if the queue is not // empty yet return; } else { is_pop_ = q_.pop(_f); } } // the queue is empty here, wait for the next command { std::unique_lock lock(mutex_); ++n_waiting_; cv_.wait(lock, [this, &_f, &is_pop_, &_flag]() { is_pop_ = q_.pop(_f); return is_pop_ || is_done_ || _flag; }); --n_waiting_; if (!is_pop_) { // if the queue is empty and is_done_ == true or *flag // then return return; } } } }; threads_[i].reset( new std::thread(f)); // compiler may not support std::make_unique() } void Init() { is_stop_ = false; is_done_ = false; n_waiting_ = 0; } std::vector> threads_; std::vector>> flags_; detail::Queue>> q_; std::atomic is_done_; std::atomic is_stop_; std::atomic n_waiting_; // how many threads are waiting std::mutex mutex_; std::condition_variable cv_;}; |
2.2.8增加的單元測(cè)試代碼
為檢驗(yàn)修改后代碼的正確性,增添如下單元測(cè)試代碼。第一個(gè)待測(cè)試函數(shù)filter_duplicates_str接受的第一個(gè)參數(shù)為一個(gè)整型ID值,我在測(cè)試代碼中只是將其作為一個(gè)占位符,實(shí)際并未使用,后面接受四個(gè)C風(fēng)格字符串,該函數(shù)的任務(wù)是去除四個(gè)字符串中的重復(fù)詞并把去重后的結(jié)果按字母升序排列,結(jié)果以std::string的形式返回;第二個(gè)待測(cè)試函數(shù)filter_duplicates只接受的一個(gè)整型ID值參數(shù),我在測(cè)試代碼中只是將其作為一個(gè)占位符,實(shí)際并未使用,該函數(shù)的任務(wù)是去除一串固定字符串中的重復(fù)詞并把去重后的結(jié)果按字母升序排列,結(jié)果以std::string的形式返回。因?yàn)镃++編譯器不能推導(dǎo)出重載函數(shù)的正確版本,因此第二個(gè)待測(cè)函數(shù)并未使用重載函數(shù)形式。兩個(gè)待測(cè)函數(shù)均使用線(xiàn)程池執(zhí)行1000次,最后檢查返回結(jié)果與預(yù)期結(jié)果的一致性。
|
#include "modules/common/util/ctpl_stl.h"#include #include #include #include #include #include #include "gtest/gtest.h"namespace apollo {namespace common {namespace util {namespace {// ...// Attention: don't use overloaded functions, otherwise the compiler can't// deduce the correct edition.std::string filter_duplicates_str(int id, const char* str1, const char* str2, const char* str3, const char* str4) { // id is unused. std::stringstream ss_in; ss_in << str1 << " " << str2 << " " << str3 << " " << str4; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}std::string filter_duplicates(int id) { // id is unused. std::stringstream ss_in; ss_in << "a a b b b c foo foo bar foobar foobar hello world hello hello world"; ?std::set string_set; std::istream_iterator beg(ss_in); std::istream_iterator end; std::copy(beg, end, std::inserter(string_set, string_set.end())); std::stringstream ss_out; std::copy(std::begin(string_set), std::end(string_set), std::ostream_iterator(ss_out, " ")); return ss_out.str();}} // namespaceTEST(ThreadPool, filter_duplicates) { const unsigned int hardware_threads = std::thread::hardware_concurrency(); const unsigned int threads = std::min(hardware_threads != 0 ? hardware_threads : 2, 50U); ThreadPool p(threads); std::vector> futures1, futures2; for (int i = 0; i < 1000; ++i) { ? ?futures1.push_back(std::move(p.Push( ? ? ? ?filter_duplicates_str, "thread pthread", "pthread thread good news", ? ? ? ?"today is a good day", "she is a six years old girl"))); ? ?futures2.push_back(std::move(p.Push(filter_duplicates))); ?} ?for (int i = 0; i < 1000; ++i) { ? ?std::string result1 = futures1[i].get(); ? ?std::string result2 = futures2[i].get(); ? ?EXPECT_STREQ( ? ? ? ?result1.c_str(), ? ? ? ?"a day girl good is news old pthread she six thread today years "); ? ?EXPECT_STREQ(result2.c_str(), "a b bar c foo foobar hello world "); ?}}} ?// namespace util} ?// namespace common} ?// namespace apollo |
{ 3 }
Apollo Planning模塊對(duì)于線(xiàn)程池的使用分析
Apollo Planning模塊通過(guò)PlanningThreadPool類(lèi)來(lái)完成對(duì)線(xiàn)程池ThreadPool的包裝調(diào)用。PlanningThreadPool類(lèi)位于頭文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.h及對(duì)應(yīng)的實(shí)現(xiàn)文件[your_apollo_root_dir]/modules/planning/common/planning_thread_pool.cc中,位于命名空間apollo::planning內(nèi)。
1PlanningThreadPool類(lèi)
PlanningThreadPool類(lèi)的聲明如下:
|
class PlanningThreadPool { public: void Init(); void Stop() { if (thread_pool_) { thread_pool_->Stop(true); } } template void Push(F &&f, Rest &&... rest) { func_.push_back(std::move(thread_pool_->Push(f, rest...))); } template void Push(F &&f) { func_.push_back(std::move(thread_pool_->Push(f))); } void Synchronize(); private: std::unique_ptr thread_pool_; bool is_initialized = false; // 這里的func_用得非常不恰當(dāng),因?yàn)檫@里保存的是std::future對(duì)象, // 而非std::function對(duì)象,將其修改為futures_很有必要。 std::vector> func_; DECLARE_SINGLETON(PlanningThreadPool);}; |
PlanningThreadPool通過(guò)宏DECLARE_SINGLETON定義一個(gè)單實(shí)例類(lèi),因此不能直接在棧(stack)和堆(heap)上創(chuàng)建該類(lèi)對(duì)象,而只能通過(guò)PlanningThreadPool::instance()獲取該類(lèi)的唯一實(shí)例。該類(lèi)中的成員變量func_非常具有誤導(dǎo)性,實(shí)際上它是一個(gè)保存著多個(gè)std::future對(duì)象的動(dòng)態(tài)數(shù)組,而不是保存std::function對(duì)象,也就是說(shuō)它保存的是函數(shù)的異步返回值對(duì)象,而非異步函數(shù)對(duì)象本身,因此這里將其修改為futures_很有必要。
2PlanningThreadPool類(lèi)的使用
在Planning模塊使用PlanningThreadPool類(lèi)的步驟如下:
3.2.1初始化線(xiàn)程池
在Planning::Init()函數(shù)(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語(yǔ)句完成PlanningThreadPool類(lèi)對(duì)象的初始化:
|
// initialize planning thread pool PlanningThreadPool::instance()->Init(); |
3.2.2 利用線(xiàn)程池完成并發(fā)處理
在合適的位置調(diào)用線(xiàn)程池完成某個(gè)功能的并發(fā)處理,一般而言是在某個(gè)循環(huán)體內(nèi)。注意:需進(jìn)行并發(fā)處理的任務(wù),相互之間不能有先后依賴(lài)關(guān)系,因?yàn)槭褂镁€(xiàn)程池執(zhí)行并發(fā)任務(wù)時(shí)根本不知道哪個(gè)任務(wù)會(huì)先執(zhí)行,哪個(gè)任務(wù)會(huì)后執(zhí)行。
Planning模塊目前在以下幾處使用了線(xiàn)程池:
ReferenceLineInfo::AddObstacles函數(shù)ReferenceLineInfo::AddObstacles函數(shù)(位于[your_apollo_root_dir]/modules/planning/common/reference_line_info.cc中)在for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線(xiàn)程池任務(wù),用于增加當(dāng)前的障礙物信息,使用PlanningThreadPool::instance()->Synchronize()等待線(xiàn)程池任務(wù)全部完成。
|
bool ReferenceLineInfo::AddObstacles(const std::vector& obstacles) {if (FLAGS_use_multi_thread_to_add_obstacles) {std::vector ret(obstacles.size(), 0);for (size_t i = 0; i < obstacles.size(); ++i) { ?const auto* obstacle = obstacles.at(i); ?PlanningThreadPool::instance()->Push(std::bind( &ReferenceLineInfo::AddObstacleHelper, this, obstacle, &(ret[i])));}PlanningThreadPool::instance()->Synchronize();if (std::find(ret.begin(), ret.end(), 0) != ret.end()) { return false;}} else {// ...}return true;} |
DPRoadGraph::GenerateMinCostPath函數(shù)DPRoadGraph::GenerateMinCostPath函數(shù)(位于[your_apollo_root_dir]/modules/planning/tasks/dp_poly_path/dp_road_graph.cc中)在每級(jí)航點(diǎn)(way point)上多個(gè)橫向采樣點(diǎn)的for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線(xiàn)程池任務(wù),用于計(jì)算本級(jí)航點(diǎn)的最小代價(jià),使用PlanningThreadPool::instance()->Synchronize()等待線(xiàn)程池任務(wù)全部完成。
|
bool DPRoadGraph::GenerateMinCostPath( const std::vector &obstacles, std::vector *min_cost_path) { // ... for (std::size_t level = 1; level < path_waypoints.size(); ++level) { ? ?const auto &prev_dp_nodes = graph_nodes.back(); ? ?const auto &level_points = path_waypoints[level]; ? ?graph_nodes.emplace_back(); ? ?for (size_t i = 0; i < level_points.size(); ++i) { ? ? ?const auto &cur_point = level_points[i]; ? ? ?graph_nodes.back().emplace_back(cur_point, nullptr); ? ? ?auto &cur_node = graph_nodes.back().back(); ? ? ?if (FLAGS_enable_multi_thread_in_dp_poly_path) { ? ? ? ?PlanningThreadPool::instance()->Push(std::bind( &DPRoadGraph::UpdateNode, this, std::ref(prev_dp_nodes), level, total_level, &trajectory_cost, &(front), &(cur_node))); } else { UpdateNode(prev_dp_nodes, level, total_level, &trajectory_cost, &front, &cur_node); } } if (FLAGS_enable_multi_thread_in_dp_poly_path) { PlanningThreadPool::instance()->Synchronize(); } } // ...} |
DpStGraph::CalculateTotalCost函數(shù)DpStGraph::CalculateTotalCost函數(shù)(位于[your_apollo_root_dir]/modules/planning/tasks/dp_st_speed/dp_st_graph.cc中)在for循環(huán)內(nèi)使用PlanningThreadPool::instance()->Push添加線(xiàn)程池任務(wù),對(duì)于時(shí)間采樣值c上的不同距離采樣值r: next_lowest_row<=r<=next_highest_row計(jì)算抵達(dá)節(jié)點(diǎn)(c, r)的最小總代價(jià),使用PlanningThreadPool::instance()->Synchronize()等待線(xiàn)程池任務(wù)全部完成。
|
Status DpStGraph::CalculateTotalCost() { // col and row are for STGraph // t corresponding to col // s corresponding to row uint32_t next_highest_row = 0; uint32_t next_lowest_row = 0; for (size_t c = 0; c < cost_table_.size(); ++c) { ? ?int highest_row = 0; ? ?int lowest_row = cost_table_.back().size() - 1; ? ?for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?if (FLAGS_enable_multi_thread_in_dp_st_graph) { ? ? ? ?PlanningThreadPool::instance()->Push( std::bind(&DpStGraph::CalculateCostAt, this, c, r)); } else { CalculateCostAt(c, r); } } if (FLAGS_enable_multi_thread_in_dp_st_graph) { PlanningThreadPool::instance()->Synchronize(); } for (uint32_t r = next_lowest_row; r <= next_highest_row; ++r) { ? ? ?const auto& cost_cr = cost_table_[c][r]; ? ? ?if (cost_cr.total_cost() < std::numeric_limits::infinity()) { int h_r = 0; int l_r = 0; GetRowRange(cost_cr, &h_r, &l_r); highest_row = std::max(highest_row, h_r); lowest_row = std::min(lowest_row, l_r); } } next_highest_row = highest_row; next_lowest_row = lowest_row; } return Status::OK();} |
3.2.3 銷(xiāo)毀線(xiàn)程池
在Planning::Stop()函數(shù)(位于[your_apollo_root_dir]/modules/planning/planning.cc)中添加如下語(yǔ)句以便 銷(xiāo)毀線(xiàn)程池:
1 |
PlanningThreadPool::instance()->Stop(); |
自Apollo平臺(tái)開(kāi)放已來(lái),我們收到了大量開(kāi)發(fā)者的咨詢(xún)和反饋,越來(lái)越多開(kāi)發(fā)者基于A(yíng)pollo擦出了更多的火花,并愿意將自己的成果貢獻(xiàn)出來(lái),這充分體現(xiàn)了Apollo『貢獻(xiàn)越多,獲得越多』的開(kāi)源精神。為此我們開(kāi)設(shè)了『開(kāi)發(fā)者說(shuō)』板塊,希望開(kāi)發(fā)者們能夠踴躍投稿,更好地為廣大自動(dòng)駕駛開(kāi)發(fā)者營(yíng)造一個(gè)共享交流的平臺(tái)!
評(píng)論