作者:Goland貓?
對于大型的互聯(lián)網(wǎng)應(yīng)用程序,如電商平臺(tái)、社交網(wǎng)絡(luò)、金融交易平臺(tái)等,每秒鐘都會(huì)收到大量的請求。在這些應(yīng)用程序中,需要使用高效的技術(shù)來應(yīng)對高并發(fā)的請求,尤其是在短時(shí)間內(nèi)處理大量的請求,如1分鐘百萬請求。
同時(shí),為了降低用戶的使用門檻和提升用戶體驗(yàn),前端需要實(shí)現(xiàn)參數(shù)的無感知傳遞。這樣用戶在使用時(shí),無需擔(dān)心參數(shù)傳遞的問題,能夠輕松地享受應(yīng)用程序的服務(wù)。
在處理1分鐘百萬請求時(shí),需要使用高效的技術(shù)和算法,以提高請求的響應(yīng)速度和處理能力。Go語言以其高效性和并發(fā)性而聞名,因此成為處理高并發(fā)請求的優(yōu)秀選擇。Go中有多種模式可供選擇,如基于goroutine和channel的并發(fā)模型、使用池技術(shù)的協(xié)程模型等,以便根據(jù)具體應(yīng)用的需要來選擇適合的技術(shù)模式。
本文代碼參考搬至
W1
W1 結(jié)構(gòu)體類型,它有五個(gè)成員:
WgSend 用于等待任務(wù)發(fā)送的 goroutine 完成。
Wg 用于等待任務(wù)處理的 goroutine 完成。
MaxNum 表示 goroutine 池的大小。
Ch 是一個(gè)字符串類型的通道,用于傳遞任務(wù)。
DispatchStop 是一個(gè)空結(jié)構(gòu)體類型的通道,用于停止任務(wù)分發(fā)。
?
type?W1?struct?{ ?WgSend???????*sync.WaitGroup ?Wg???????????*sync.WaitGroup ?MaxNum???????int ?Ch???????????chan?string ?DispatchStop?chan?struct{} }
?
接下來是 Dispatch 方法,它將任務(wù)發(fā)送到通道 Ch 中。它通過 for 循環(huán)來發(fā)送 10 倍于 MaxNum 的任務(wù),每個(gè)任務(wù)都是一個(gè) goroutine。defer 語句用于在任務(wù)完成時(shí)減少 WgSend 的計(jì)數(shù)。select 語句用于在任務(wù)分發(fā)被中止時(shí)退出任務(wù)發(fā)送。
Dispatch
?
func?(w?*W1)?Dispatch(job?string)?{ ?w.WgSend.Add(10?*?w.MaxNum) ?for?i?:=?0;?i?10*w.MaxNum;?i++?{ ??go?func(i?int)?{ ???defer?w.WgSend.Done() ???select?{ ???case?w.Ch?<-?fmt.Sprintf("%d",?i): ????return ???case?<-w.DispatchStop: ????fmt.Println("退出發(fā)送?job:?",?fmt.Sprintf("%d",?i)) ????return ???} ??}(i) ?} }
?
StartPool
然后是 StartPool 方法,它創(chuàng)建了一個(gè) goroutine 池來處理從通道 Ch 中讀取到的任務(wù)。
如果通道 Ch 還沒有被創(chuàng)建,那么它將被創(chuàng)建。如果計(jì)數(shù)器 WgSend 還沒有被創(chuàng)建,那么它也將被創(chuàng)建。如果計(jì)數(shù)器 Wg 還沒有被創(chuàng)建,那么它也將被創(chuàng)建。
如果通道 DispatchStop 還沒有被創(chuàng)建,那么它也將被創(chuàng)建。
for 循環(huán)用于創(chuàng)建 MaxNum 個(gè) goroutine 來處理從通道中讀取到的任務(wù)。defer 語句用于在任務(wù)完成時(shí)減少 Wg 的計(jì)數(shù)。
?
func?(w?*W1)?StartPool()?{ ?if?w.Ch?==?nil?{ ??w.Ch?=?make(chan?string,?w.MaxNum) ?} ?if?w.WgSend?==?nil?{ ??w.WgSend?=?&sync.WaitGroup{} ?} ?if?w.Wg?==?nil?{ ??w.Wg?=?&sync.WaitGroup{} ?} ?if?w.DispatchStop?==?nil?{ ??w.DispatchStop?=?make(chan?struct{}) ?} ?w.Wg.Add(w.MaxNum) ?for?i?:=?0;?i??
Stop
最后是 Stop 方法,它停止任務(wù)分發(fā)并等待所有任務(wù)完成。
它關(guān)閉了通道 DispatchStop,等待 WgSend 中的任務(wù)發(fā)送 goroutine 完成,然后關(guān)閉通道 Ch,等待 Wg 中的任務(wù)處理 goroutine 完成。
?
func?(w?*W1)?Stop()?{ ?close(w.DispatchStop) ?w.WgSend.Wait() ?close(w.Ch) ?w.Wg.Wait() }?
W2
SubWorker
?
type?SubWorker?struct?{ ?JobChan?chan?string }?
子協(xié)程,它有一個(gè) JobChan,用于接收任務(wù)。
Run:SubWorker 的方法,用于啟動(dòng)一個(gè)子協(xié)程,從 JobChan 中讀取任務(wù)并執(zhí)行。
?
func?(sw?*SubWorker)?Run(wg?*sync.WaitGroup,?poolCh?chan?chan?string,?quitCh?chan?struct{})?{ ?if?sw.JobChan?==?nil?{ ??sw.JobChan?=?make(chan?string) ?} ?wg.Add(1) ?go?func()?{ ??defer?wg.Done() ??for?{ ???poolCh?<-?sw.JobChan ???select?{ ???case?res?:=?<-sw.JobChan: ????fmt.Printf("完成工作:?%s? ",?res) ???case?<-quitCh: ????fmt.Printf("消費(fèi)者結(jié)束......? ") ????return ???} ??} ?}() }?
W2
?
type?W2?struct?{ ?SubWorkers?[]SubWorker ?Wg?????????*sync.WaitGroup ?MaxNum?????int ?ChPool?????chan?chan?string ?QuitChan???chan?struct{} }?
Dispatch
Dispatch:W2 的方法,用于從 ChPool 中獲取 TaskChan,將任務(wù)發(fā)送給一個(gè) SubWorker 執(zhí)行。
?
func?(w?*W2)?Dispatch(job?string)?{ ?jobChan?:=?<-w.ChPool ?select?{ ?case?jobChan?<-?job: ??fmt.Printf("發(fā)送任務(wù)?:?%s?完成? ",?job) ??return ?case?<-w.QuitChan: ??fmt.Printf("發(fā)送者(%s)結(jié)束? ",?job) ??return ?} }?
StartPool
StartPool:W2 的方法,用于初始化協(xié)程池,啟動(dòng)所有子協(xié)程并把 TaskChan 存儲(chǔ)在 ChPool 中。
?
func?(w?*W2)?StartPool()?{ ?if?w.ChPool?==?nil?{ ??w.ChPool?=?make(chan?chan?string,?w.MaxNum) ?} ?if?w.SubWorkers?==?nil?{ ??w.SubWorkers?=?make([]SubWorker,?w.MaxNum) ?} ?if?w.Wg?==?nil?{ ??w.Wg?=?&sync.WaitGroup{} ?} ?for?i?:=?0;?i??
Stop
Stop:W2 的方法,用于停止協(xié)程的工作,并等待所有協(xié)程結(jié)束。
?
func?(w?*W2)?Stop()?{ ?close(w.QuitChan) ?w.Wg.Wait() ?close(w.ChPool) }?
DealW2 函數(shù)則是整個(gè)協(xié)程池的入口,它通過 NewWorker 方法創(chuàng)建一個(gè) W2 實(shí)例,然后調(diào)用 StartPool 啟動(dòng)協(xié)程池,并通過 Dispatch 發(fā)送任務(wù),最后調(diào)用 Stop 停止協(xié)程池。
?
func?DealW2(max?int)?{ ?w?:=?NewWorker(w2,?max) ?w.StartPool() ?for?i?:=?0;?i?10*max;?i++?{ ??go?w.Dispatch(fmt.Sprintf("%d",?i)) ?} ?w.Stop() }?
個(gè)人見解
看到這里對于w2我已經(jīng)有點(diǎn)迷糊了,還能傳遞w.Wg, w.ChPool, w.QuitChan?
?
原來是golang里如果方法傳遞的不是地址,那么就會(huì)做一個(gè)拷貝,所以這里調(diào)用的wg根本就不是一個(gè)對象。 傳遞的地方傳遞地址就可以了,如果不傳遞地址,將會(huì)出現(xiàn)死鎖 go?doSomething(i,?&wg,?ch) func?doSomething(index?int,?wg?*sync.WaitGroup,?ch?chan?int)?{?
w1也有一個(gè)比較大的問題。在處理請求時(shí),每個(gè) Goroutine 都會(huì)占用一定的系統(tǒng)資源,如果請求量過大,會(huì)造成 Goroutine 數(shù)量的劇增,消耗過多系統(tǒng)資源,程序可能會(huì)崩潰
探究原文
在這段代碼中,poolCh代表工作者池,sw.JobChan代表工作者的工作通道。當(dāng)一個(gè)工作者完成了工作后,它會(huì)將工作結(jié)果發(fā)送到sw.JobChan,此時(shí)可以通過case res := <-sw.JobChan:來接收該工作的結(jié)果。
在這個(gè)代碼塊中,還需要處理一個(gè)退出信號(hào)quitCh。因此,第二個(gè)case <-quitCh:用于檢測是否接收到了退出信號(hào)。如果接收到了退出信號(hào),程序?qū)⒋蛴〕鱿⒉⒔Y(jié)束。
需要注意的是,這兩個(gè)case語句是互斥的,只有當(dāng)工作者完成工作或收到退出信號(hào)時(shí),才會(huì)進(jìn)入其中一個(gè)語句。因此,這個(gè)循環(huán)可以保證在工作者完成工作或收到退出信號(hào)時(shí)退出。
需要讀取兩次sw.JobChan的原因是:第一次讀取用于將工作者的工作通道放回工作者池中,這樣其他工作者就可以使用該通道。第二次讀取用于接收工作者的工作結(jié)果或退出信號(hào)。因此,這兩次讀取是為了確保能夠在正確的時(shí)刻將工作者的工作通道放回工作者池中并正確地處理工作結(jié)果或退出信號(hào)。
根據(jù)w2的特點(diǎn) 我自己寫了一個(gè)w2
?
import?( ???"fmt" ???"sync" ) type?SubWorkerNew?struct?{ ???JobChan?chan?string } type?W2New?struct?{ ???SubWorkers?[]SubWorkerNew ???Wg?????????*sync.WaitGroup ???MaxNum?????int ???ChPool?????chan?chan?string ???QuitChan???chan?struct{} } func?NewW2(maxNum?int)?*W2New?{ ???subWorkers?:=?make([]SubWorkerNew,?maxNum) ???for?i?:=?0;?i??
但是有幾個(gè)點(diǎn)需要注意
1.沒有考慮JobChan通道的緩沖區(qū)大小,如果有大量任務(wù)被并發(fā)分配,容易導(dǎo)致內(nèi)存占用過高;
2.每個(gè)線程都會(huì)執(zhí)行無限循環(huán),此時(shí)線程退出的條件是接收到QuitChan通道的信號(hào),可能導(dǎo)致線程的阻塞等問題;
3.Dispatch函數(shù)的默認(rèn)情況下只會(huì)輸出"All workers busy",而不是阻塞,這意味著當(dāng)所有線程都處于忙碌狀態(tài)時(shí),任務(wù)會(huì)丟失
4.線程池啟動(dòng)后無法動(dòng)態(tài)擴(kuò)展或縮小。
優(yōu)化
這個(gè)優(yōu)化版本改了很多次。有一些需要注意的點(diǎn)是,不然會(huì)一直死鎖
?
1.使用sync.WaitGroup來確保線程池中所有線程都能夠啟動(dòng)并運(yùn)行; 2.在Stop函數(shù)中,先向SubWorker的JobChan中發(fā)送一個(gè)關(guān)閉信號(hào),再等待所有SubWorker線程退出; 3.在Dispatch函數(shù)中,將默認(rèn)情況下的輸出改為阻塞等待可用通道;?
w2new
?
package?handle_million_requests import?( ?"fmt" ?"sync" ?"time" ) type?SubWorkerNew?struct?{ ?Id??????int ?JobChan?chan?string } type?W2New?struct?{ ?SubWorkers?[]SubWorkerNew ?MaxNum?????int ?ChPool?????chan?chan?string ?QuitChan???chan?struct{} ?Wg?????????*sync.WaitGroup } func?NewW2(maxNum?int)?*W2New?{ ?chPool?:=?make(chan?chan?string,?maxNum) ?subWorkers?:=?make([]SubWorkerNew,?maxNum) ?for?i?:=?0;?i??1?{ ??worker?:=?w.SubWorkers[w.MaxNum-1] ??close(worker.JobChan) ??w.MaxNum-- ??w.SubWorkers?=?w.SubWorkers[:w.MaxNum] ?} }?
AddWorker和RemoveWorker,用于動(dòng)態(tài)擴(kuò)展/縮小線程池。
在AddWorker函數(shù)中,我們首先將MaxNum增加了1,然后創(chuàng)建一個(gè)新的SubWorkerNew結(jié)構(gòu)體,將其添加到SubWorkers中,并將其JobChan通道添加到ChPool通道中。最后,我們創(chuàng)建一個(gè)新的協(xié)程來處理新添加的SubWorkerNew并讓它進(jìn)入無限循環(huán),等待接收任務(wù)。
在RemoveWorker函數(shù)中,我們首先將MaxNum減少1,然后獲取最后一個(gè)SubWorkerNew結(jié)構(gòu)體,將它的JobChan通道發(fā)送到ChPool通道中,并從其通道中讀取任何待處理的任務(wù),最后創(chuàng)建一個(gè)新的協(xié)程來處理SubWorkerNew,繼續(xù)處理任務(wù)。
測試用例
?
func?TestW2New(t?*testing.T)?{?? ????pool?:=?NewW2(3)?? ????pool.StartPool()?? ????pool.Dispatch("task?1")?? ????pool.Dispatch("task?2")?? ????pool.Dispatch("task?3")?? ????pool.AddWorker()?? ????pool.AddWorker()?? ????pool.RemoveWorker()?? ????pool.Stop()?? }?
當(dāng)Dispatch函數(shù)向ChPool通道獲取可用通道時(shí),會(huì)從通道中取出一個(gè)SubWorker的JobChan通道,并將任務(wù)發(fā)送到該通道中。而對于SubWorker來說,并沒有進(jìn)行任務(wù)的使用次數(shù)限制,所以它可以處理多個(gè)任務(wù)。
在這個(gè)例子中,當(dāng)任務(wù)數(shù)量比SubWorker數(shù)量多時(shí),一個(gè)SubWorker的JobChan通道會(huì)接收到多個(gè)任務(wù),它們會(huì)在SubWorker的循環(huán)中按順序依次處理,直到JobChan中沒有未處理的任務(wù)為止。因此,如果任務(wù)數(shù)量特別大,可能會(huì)導(dǎo)致某些SubWorker的JobChan通道暫時(shí)處于未處理任務(wù)狀態(tài),而其他的SubWorker在執(zhí)行任務(wù)。
在測試結(jié)果中,最后三行中出現(xiàn)了多個(gè)"SubWorker 0 processing job",說明SubWorker 0的JobChan通道接收了多個(gè)任務(wù),并且在其循環(huán)中處理這些任務(wù)。下面的代碼片段顯示了這個(gè)過程:
// SubWorker 0 的循環(huán)部分
?
for?{ ????select?{ ????case?job?:=?<-subWorker.JobChan: ????????fmt.Printf("SubWorker?%d?processing?job?%s ",?subWorker.Id,?job) ????case?<-w.QuitChan: ????????return ????} }審核編輯:湯梓紅?
評論