一区二区三区三上|欧美在线视频五区|国产午夜无码在线观看视频|亚洲国产裸体网站|无码成年人影视|亚洲AV亚洲AV|成人开心激情五月|欧美性爱内射视频|超碰人人干人人上|一区二区无码三区亚洲人区久久精品

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

C++20無棧協(xié)程超輕量高性能異步庫開發(fā)實戰(zhàn)

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-09 10:20 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

c++20出來有一段時間了。其中一大功能就是終于支持協(xié)程了(c++作為行業(yè)大哥大級別的語言,居然到C++20才開始支持協(xié)程,我也是無力吐槽了,讓多少人等了多少年,等了多少青春)但千呼萬喚他終于還是來了,c++標準委員會的謹慎態(tài)度也造就了c++20的給出來協(xié)程:“性能之優(yōu)秀”,“開發(fā)之靈活”和讓人勸退的“門檻之高”。

不過話說回來,c++從出身就注定了背負性能使命,他不是為簡單為應(yīng)用層維度開發(fā)的語言(如果應(yīng)用層你大可以用python java ruby lua等語言),他是一門可以開發(fā)其他語言的語言,所以追逐高性能和靈活性,舍棄矯情的低門檻,畢竟C++不是設(shè)計來給所有人用的語言。

之前用過python的協(xié)程,協(xié)程易用程度高,所以c++20到來也想嘗試c++狀態(tài)下的協(xié)程,但是接觸以后發(fā)現(xiàn)問題,c++20的協(xié)程狀態(tài)是:只有基礎(chǔ)設(shè)施,也就是實現(xiàn)了無棧協(xié)程的所有機制和功能,但沒有封裝到具體的應(yīng)用層標準庫STL。此時大部分人就只能干瞪眼了,由于復雜的協(xié)程運作機制,沒有實現(xiàn)標準庫的情況下,說要用上協(xié)程你是在開玩笑,網(wǎng)上一致的意見c++20是半成品,要真的用上c++協(xié)程得等c++23協(xié)程標準庫完善后才行。

一貫本著不作死就不會死得態(tài)度,只會用庫不懂底層機制,那不是用c++的態(tài)度,所以深入學習c++20協(xié)程,半個月時間,寫了一個簡單的協(xié)程庫,在此過程中也對復雜的c++協(xié)程機制有了深入的了解。話說asio和cppcoro兩個庫已經(jīng)支持了c++20協(xié)程,但是我覺得還是龐大和復雜,對于想通過看庫源代碼學習c++協(xié)程的同學,我覺得還是算了,在不懂協(xié)程機理的情況下,你連看源代碼都看不懂好吧?。∮腥藭f有源代碼了你都看不懂,你是吹牛。那還真不是,c++協(xié)程在語法上會有些顛覆你的三觀,我們來舉個例子:

A func(int a){
co_await std::suspend_always{};
co_yield a;
co_return 0;
}
int main(){
auto co = func(1);
co.hd.resume();
int a = co.hd.resume();
int b = co.hd.resume();
}

有人說func是一個協(xié)程函數(shù),main中的func運行后會返回0,也就是 co是一個int變量值為0;

如果你按常規(guī)代碼理解,沒錯。但是在c++協(xié)程的世界,他完全不是上面說的情況。

正確的情況是: func在這里是一個協(xié)程生成器(這個概念很重要,他不是函數(shù))返回值co是一個協(xié)程管理類A關(guān)聯(lián)了具體協(xié)程執(zhí)行體后的協(xié)程實例的控制句柄(的包裝對象)。明確co不是協(xié)程實例(協(xié)程幀),是協(xié)程實例的控制句柄的包裝對象,在func(1)執(zhí)行之后他只是“動態(tài)”生成了一個協(xié)程實例,并把控制句柄返回給用戶,但此時這個協(xié)程是掛起的,協(xié)程體{}代碼塊還沒有被執(zhí)行過,所以不存在返回值。這非常的繞,讓人難以理解(后面還有更難理解的)。

在三次co.hd.resume();調(diào)用后協(xié)程才被完全執(zhí)行完畢,此時a=1,b=0;

返回值保存在協(xié)程的實例(協(xié)程幀)中,通過協(xié)程管理類A的內(nèi)部流程控制函數(shù)管理著返回值(A的promise_type定義了所有的協(xié)程控制行為)。

總結(jié)幾點 (重要,不要混淆):

1、“協(xié)程管理類A是包含協(xié)程行為控制的類定義 ,A不是協(xié)程,形如 A func(int a, …){ … } 才是一個完整的協(xié)程定義”;所以A func1(){}; A func2(){}; A func3(){}; 都可以與同一個協(xié)程控制A綁定,但他們是3個不同的協(xié)程定義,只是協(xié)程控制行為都為A。好處是,你可以用一個std::vector< A > 保存下這3個不同的協(xié)程,他們的主協(xié)程體(功能實現(xiàn))各不相同。要讓A為一個協(xié)程管理類,必須包含struct promise_type{}定義,和一個控制句柄對象std::coroutine_handle< promise_type > hd; 特別的,A可以不實現(xiàn)await_xxx接口,他可以不是可等待體。

2、代碼塊體中有co_await ,co_yield,co_return關(guān)鍵字,則為協(xié)程體代碼塊,運行到關(guān)鍵字位置會**“觸發(fā)協(xié)程掛起” ** ,此時原調(diào)用者代碼阻塞在resume函數(shù)位置,運行權(quán)重新回到調(diào)用者,此時resume會返回,調(diào)用者繼續(xù)執(zhí)行;

3、特別的:

co_await可以與可等待對象配合,形成更為復雜的協(xié)程掛起行為:一般異步IO操作,都是通過co_await + io可等待對象,完成異步操作后掛起協(xié)程,等待異步io完成后,再由**“調(diào)度器”**恢復協(xié)程繼續(xù)運行,從而發(fā)揮異步的意義,形成io復雜度向cpu復雜度的轉(zhuǎn)移。因此,協(xié)程解決的是問題是“異步”而不是“并行”,要實現(xiàn)并行只能考慮多線程或多進程,協(xié)程可以將單個線程cpu效率發(fā)揮到最大,而不會被io阻塞浪費掉當前線程的cpu算能,那問題來了,如果我們用 協(xié)程 + 多線程/多進程 結(jié)合模式呢,那恭喜你,世界都將是你的;

co_yield實現(xiàn)簡單掛起,簡單的立即放棄運行權(quán),返回調(diào)用者,可恢復(異步應(yīng)用場景相對較少,多用于循環(huán)生成器);

co_return實現(xiàn)最后一次簡單掛起,立即放棄運行權(quán),返回調(diào)用者,協(xié)程后續(xù)不再可恢復(應(yīng)用于協(xié)程退出);

4、可等待體(類形如 struct B{ await_ready();await_suspend();await_resume(); } 實現(xiàn) 三個await_xxx接口的類B是一個可等待體定義),他的實例是一個可等待對象;其中await_suspend()在執(zhí)行后(不是執(zhí)行前),會觸發(fā)當前協(xié)程掛起(記住,此處不是可等待對象掛起,是co_await 此可等待對象的當前協(xié)程掛起,不能混淆,由于概念不清,我在這個位置耽誤了很久的時間)

5、協(xié)程管理類A,和可等待體B,他們沒有直接關(guān)系,是兩個不同的東西??傻却wB控制掛起時點的行為是局部的,協(xié)程控制A控制協(xié)程整體創(chuàng)建,運行,掛起,銷毀,異常捕獲等過程的行為是整體的;協(xié)程只對應(yīng)有一個控制類A,但是內(nèi)部可以有多次掛起操作,每次操作對應(yīng)一個可等待對象;

庫開發(fā)

本文重點是庫實戰(zhàn)開發(fā),關(guān)于協(xié)程框架中的 3大概念:協(xié)程定義類及promise_type{},可等待體awaitable,協(xié)程控制句柄std::coroutine_handle< > ,此處不做介紹,自行了解。

但是要介紹一下協(xié)程調(diào)度的運行邏輯,以此加深庫開發(fā)過程的理解。這個過程在多線程下面是由內(nèi)核管理的我們很少會了解,但是到了協(xié)程,你還要自己寫庫,那必須自己實現(xiàn)協(xié)程的調(diào)度算法和event loop模式

在此,我打個形象比喻:

現(xiàn)在一個家中有5個兒子,他們能力各不相同(工作者協(xié)程),還有一個媽媽(調(diào)度者協(xié)程),現(xiàn)在只有一臺電腦(單線程時間片),同一時刻,這臺電腦只能被老媽分給其中一個兒子來使用(協(xié)程搶占),其中一個兒子首先得到電腦開始工作(協(xié)程恢復),其他兒子只能等著無法工作(協(xié)程等待狀態(tài)),有電腦的兒子工作一會后此時他發(fā)送一封對外郵件(可等待對象)但要等待郵件回復后才能繼續(xù)工作(io等待完成),因為其他人此時還在等著用電腦而自己此時不具備繼續(xù)工作的條件,所以他識趣的放棄電腦的使用權(quán),并把電腦交還給老媽(協(xié)程掛起等待,執(zhí)行權(quán)交還caller)并等著老媽下次再把電腦給他使用,老媽拿到電腦后(調(diào)度協(xié)程恢復執(zhí)行)檢查是否有回復郵件到來(調(diào)度協(xié)程檢查事件完成,對應(yīng)事件循環(huán)iocp/epoll),如果有了,老媽檢查這封回復郵件是回復給哪個兒子的,并叫來對應(yīng)的兒子(協(xié)程調(diào)度),把電腦交給他(協(xié)程恢復),得到電腦的兒子打開回復郵件拿到結(jié)果(await_resume() 返回異步io結(jié)果)繼續(xù)工作,…, 不斷循環(huán)。至此,完成一個協(xié)程完整調(diào)度流程。

要實現(xiàn)一個協(xié)程庫,他需要幾個東西:

1、實現(xiàn)具體的異步操作的可等待體(類似比喻中的發(fā)郵件操作,定義是否將電腦歸還,獲取回復后打開查詢結(jié)果等行為);

2、協(xié)程控制類A(他是一個協(xié)程任務(wù)task),A的promise_type中應(yīng)該記錄協(xié)程的相關(guān)狀態(tài),記錄掛起點的可等待對象的指針(很重要),可等待對象也可以充當task和調(diào)度協(xié)程,信息交換的媒介,可等待對象指針通過 await_suspend() 過程傳遞給task的promise做記錄并保存。調(diào)度協(xié)程通過可等待對象指針在異步操作完成時將異步操作結(jié)果傳回給等待的task。

3、 如總結(jié)和比喻所說,最重要的,還需要一個“協(xié)程調(diào)度器”。第一、他有一個主調(diào)度協(xié)程,調(diào)度協(xié)程具有一系列的調(diào)度算法,他的工作就是監(jiān)測io異步完成事件的到來和分配執(zhí)行權(quán)給task,第二,他維護有一個task協(xié)程隊列(可以多種方法實現(xiàn)),隊列記錄著所有的協(xié)程實例的句柄,這個隊列是為了協(xié)程調(diào)度準備的。

(注:之所以C++20無法直接使用的原因,其實就是,以上3個具體的工具沒有現(xiàn)成的庫,由于高度靈活,c++希望使用者自己實現(xiàn)以上組件,這讓用慣成品庫的我們非常難受,望而卻步,天天喊著等c++23的標準庫,但c++23也不能將所有的需求都囊括,遇到特殊需求還是要自己寫)

實現(xiàn)思路

調(diào)度器:

1 調(diào)度協(xié)程中的event loop本例是在Windows下采用的iocp模型(linux下可以使用epoll也很好改,原理一樣)

2、調(diào)度算法采用簡單的等權(quán)重調(diào)度,也就是掛入?yún)f(xié)程隊列的task,輪流調(diào)度,每個被調(diào)度的task被調(diào)度的機會相同;

3、完成事件標記和task恢復,業(yè)務(wù)分開,這樣目的是使得通過co_yield簡單掛起的任務(wù)有重新執(zhí)行的機會(因為co_yeild不會在后續(xù)觸發(fā)完成事件)

4、調(diào)度器中記錄著協(xié)程隊列的開始task和末尾task的handle,以便調(diào)度協(xié)程;

可等待體:

1、文件file異步read,write操作;

2、網(wǎng)絡(luò)套接字,tcp協(xié)議下異步listen,accept, send, recv 操作;

3、網(wǎng)絡(luò)套接字,udp協(xié)議下異步sendto, recvfrom 操作;

4、協(xié)程休眠,實現(xiàn)sleepFor,sleepForEx操作,分別實現(xiàn)協(xié)程任務(wù)的毫秒和微秒級休眠;

5、在iocp模型下以上api都提供了重疊io操作,此時將api執(zhí)行成功的重疊io操作,將對應(yīng)的可等待體指針記錄到當前協(xié)程變量中(promise_type中的變量),一旦完成事件到來,調(diào)度協(xié)程就會設(shè)置可等待對象的完成標記狀態(tài)為true,調(diào)度協(xié)程只要在輪詢中逐個檢查task保存的可等待對象指針,檢查完成標記是否為true,為true恢復執(zhí)行該協(xié)程,為false則跳過該協(xié)程,繼續(xù)輪詢 event loop;

任務(wù)定義(task協(xié)程):

1、task協(xié)程的promise_type中定義3個變量,

2、保存當前掛起的可等待提指針,如果當前協(xié)程不是io掛起或者是沒有掛起,該指針應(yīng)該為null

3、保存當前協(xié)程自身所屬調(diào)度器Scheduler的指針;

4、保存此刻協(xié)程隊列中的前一個協(xié)程task的handle和后一個協(xié)程task的handle;

5、若當前task的可等待對象完成標記為true,則調(diào)度協(xié)程會將該task的before task和behind task鏈接,將該task的handle移動到協(xié)程隊列尾部,并且resume task,完成調(diào)度和恢復;

啟動協(xié)程調(diào)度:

1、實例化調(diào)度器 CoScheduler;

2、通過lambda表達方式定義task協(xié)程,并加入到調(diào)度器的協(xié)程隊列;

3、通過run方法啟動調(diào)度器調(diào)度運行各協(xié)程任務(wù);

4、task協(xié)程中又可以動態(tài)嵌套生產(chǎn)新的task協(xié)程加入到調(diào)度隊列;

先看測試效果:(后面會有源碼)

案例1:tcp 服務(wù)器/客戶端模型測試

除調(diào)度協(xié)程外,協(xié)程隊列中會產(chǎn)生4個task,一個服務(wù)監(jiān)聽器task,一個客戶端生成器task,服務(wù)端task,客戶端task

Main coro scheduler started ...
Main coro scheduler: Iocp loop started ... //0 調(diào)度協(xié)程執(zhí)行
Iocp: New task arrived and first run, tid=26064
Tcp server coro started ... //1 監(jiān)聽器task執(zhí)行
Server listener accept wait ... --》 在accept異步掛起
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 (event loop段)
Clients creater coro started. //2 客戶端生成器task執(zhí)行
Clients creater make client task 1. --》 動態(tài)生成客戶端task加入隊列
Clients creater yield run privilege to coro scheduler. --> 通過co_yield返回調(diào)度協(xié)程
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行
Iocp: New task arrived and first run, tid=26064 --》 調(diào)度新到來的task
Client[1] connect wait ... //3 客戶端task執(zhí)行 在connect異步掛起
Iocp: IoFileAwaitable[TConnect] completed 0 bytes, tid=26064 //0 調(diào)度協(xié)程 執(zhí)行 檢測到connect完成事件
Clients creater fetch run privilege again. //2 客戶端生成器task 執(zhí)行
Clients creater yield run privilege to coro scheduler.
Client[1] send wait ...
Iocp: IoFileAwaitable[TAccept] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到accept完成事件
Server listener accept wait ... //1 服務(wù)端監(jiān)聽task執(zhí)行 在accept異步掛起
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程 執(zhí)行
Clients creater fetch run privilege again. //2 客戶端生成器task執(zhí)行
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行
Server[1] send wait ... //4 服務(wù)端task執(zhí)行
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到send完成事件
Client[1] recv wait ... //3 客戶端task執(zhí)行
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到recv完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv wait ... //4 服務(wù)端task執(zhí)行 在recv異步掛起
Client[1] recv server msg = //3 客戶端task執(zhí)行
Hello client. this is server 1. 1st response. --》打印服務(wù)端發(fā)來的消息
Client[1] send wait ... --》在send異步掛起
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到recv完成事件
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 --》 檢測到send完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv client msg = //4 服務(wù)端task執(zhí)行
Helle server, this is client 1: 2st request. -->打印客戶端發(fā)來的消息
Server[1] send wait ...

多個協(xié)程任務(wù)的異步交替執(zhí)行,就是在一個協(xié)程遇到 可掛起的異步操作時,比如connect accept send recv等,把運行權(quán)限歸還給調(diào)度器,當完成事件到來,調(diào)度器又把執(zhí)行權(quán)返回給task,形成執(zhí)行權(quán)在調(diào)度器和task之間反復橫跳的情況,實現(xiàn)cpu的多任務(wù)復用;

案例2:udp 廣播模式測試

Main coro scheduler started ... // 同案例1 調(diào)度啟動,分別產(chǎn)生3個服務(wù)和3個客戶端
Main coro scheduler: Iocp loop started ...
Iocp: New task arrived and first run, tid=31188
Servers creater coro started.
Servers creater make server task 1.
Servers creater make server task 2.
Servers creater make server task 3.
Servers creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Clients creater coro started.
Clients creater make broadcastor client task 1.
Clients creater make broadcastor client task 2.
Clients creater make broadcastor client task 3.
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Iocp: New task arrived and first run, tid=31188
Udp server[1] coro started bind port = 33000...
Udp server[1] recvfrom wait ... //服務(wù)端1 異步接收
Iocp: New task arrived and first run, tid=31188
Udp server[2] coro started bind port = 33001...
Udp server[2] recvfrom wait ... //服務(wù)端2 異步接收
Iocp: New task arrived and first run, tid=31188
Udp server[3] coro started bind port = 33002...
Udp server[3] recvfrom wait ... //服務(wù)端3 異步接收
Iocp: New task arrived and first run, tid=31188
Broadcastor[1] send wait ... //客戶端1 異步發(fā)送
Iocp: New task arrived and first run, tid=31188
Broadcastor[2] send wait ... //客戶端2 異步發(fā)送
Iocp: New task arrived and first run, tid=31188
Broadcastor[3] send wait ... //客戶端3 異步發(fā)送
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188 //調(diào)度器 recvfrom事件完成
Servers creater fetch run privilege again.
Servers creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188 //調(diào)度器 sendto事件完成
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Udp server[2] recvfrom 1st broadcast 75 bytes data, msg = //服務(wù)端2 收到并打印消息
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Udp server[2] recvfrom wait ... --》 在recvfrom異步掛起
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Udp server[3] recvfrom 1st broadcast 75 bytes data, msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Udp server[3] recvfrom wait ...
Broadcastor[1] sendto server msg =
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Broadcastor[1] send wait ...
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Broadcastor[2] sendto server msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Broadcastor[2] send wait ...
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Broadcastor[3] sendto server msg =
Helle server, this is broadcastor 3: 1st randon broadcast to port=33001.

再看看性能測試:

同樣采用案例1和案例2的模型,但這次tcp采用100個server/client共200個task,udp采用20個braodcast/reciever共40個task來測試并發(fā)效果,做一下統(tǒng)計;效果如下

Tcp server coro started ...
Clients creater coro started.
Clients creater make client task 1.
...
Clients creater make client task 100.
Summary coro count 203: total handle 92752 times (spend time 3.06413s), 30902.3 times/per-second.
Summary coro count 203: total handle 185852 times (spend time 6.06633s), 31010.6 times/per-second.
Summary coro count 203: total handle 278601 times (spend time 9.06766s), 30902.6 times/per-second.
Summary coro count 203: total handle 371901 times (spend time 12.0696s), 31080.1 times/per-second.
Summary coro count 203: total handle 466752 times (spend time 15.0719s), 31592 times/per-second.

按server和client一次完整的send和recv,也就是4此tcp通信,記錄為一次有效通訊記錄,記為1times,

則結(jié)果顯示,在coro=200時候,單個線程平均每秒將完成3萬次有效通訊(雖然是自導自演,但是協(xié)程的功能完整實現(xiàn)了,性能可觀)

Servers creater coro started.
Servers creater make server task 1.
...
Servers creater make server task 20.
Clients creater coro started.
Clients creater make broadcastor client task 1.
...
Clients creater make broadcastor client task 20.
Udp server[1] coro started bind port = 33000...
...
Udp server[20] coro started bind port = 33019...
Summary coro count 43: total handle 541730 times (spend time 3.02587s), 180571 times/per-second.
Summary coro count 43: total handle 1082377 times (spend time 6.02621s), 180196 times/per-second.
Summary coro count 43: total handle 1623102 times (spend time 9.02651s), 180223 times/per-second.
Summary coro count 43: total handle 2165716 times (spend time 12.0268s), 180853 times/per-second.
Summary coro count 43: total handle 2731919 times (spend time 15.0271s), 188714 times/per-second.

由于udp是單向非鏈接協(xié)議,速度會比tcp快得多,按一次sendto和recvfrom記為一次有效通訊,則在coro=40時候,單線程每秒有效通訊18萬次。

最后

c++協(xié)程理解之后并不是很難,并且只要api提供異步方案,都可以實現(xiàn)協(xié)程庫的封裝,比如mysql,redis等異步操作,后續(xù)都可以依葫蘆畫瓢,很快實現(xiàn)c++協(xié)程庫的開發(fā)。

本庫開發(fā)只是為記錄c++協(xié)程學習的經(jīng)歷,很多功能后續(xù)還需完善。目前支持在windows下的各位file socket sleep的異步操作,后續(xù)可擴展支持linux的epoll模型。

代碼

頭文件CLCoroutine.h 其中的void test_coroutine_tcp_server()和void test_coroutine_udp_random_broadcast()就是案例1和案例2的測試代碼。

#ifndef __CL_COROUTINE__
#define __CL_COROUTINE__

#if (defined(__cplusplus) && __cplusplus >= 202002L) || (defined(_HAS_CXX20) && _HAS_CXX20)
#ifndef CLUseCorotine
#define CLUseCorotine 1
#endif
#endif

#if (defined(CLUseCorotine) && CLUseCorotine)

#include
#include
#include
#include "../_cl_common/CLCommon.h"

#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN //精簡windows包含庫的大小
#define WIN32_LEAN_AND_MEAN
#endif // !WIN32_LEAN_AND_MEAN
#include "Windows.h"
#include "Winsock2.h"
#include "WS2tcpip.h"
#include "MSWSock.h"
#pragma comment(lib, "ws2_32.lib")
#else
#endif

struct CoScheduler;

//(協(xié)程)任務(wù)單元
struct CoTask {

using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;

struct promise_type {
CoTask get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() { }
template
std::suspend_always yield_value(const U& val) {
pAwaitableFile = nullptr;
return {};
}

CoScheduler* sc = 0;
handle before = 0, behind = 0;
void* pAwaitableFile = 0;
};

bool resume();
handle hd;
};

//(協(xié)程)任務(wù)調(diào)度器。包含主調(diào)度協(xié)程和事件循環(huán),維護掛起的(協(xié)程)任務(wù)隊列
struct CoScheduler {

struct MainCoro {
using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;
struct promise_type {
MainCoro get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() { }
CoScheduler* sc = 0;
};
constexpr bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<>) { }
auto await_resume() const { }
handle hd;
};

CoScheduler()
: m_curTid(std::this_thread::get_id())
, m_hIocp(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0))
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
}
bool registe(HANDLE hFile) {
if (!hFile || hFile == INVALID_HANDLE_VALUE || !m_hIocp || ::CreateIoCompletionPort(hFile, m_hIocp, 0, 0) != m_hIocp)
return false;
else
return true;
}
bool registe(SOCKET sock) {
return registe((HANDLE)sock);
}
// 創(chuàng)建task并等待后續(xù)調(diào)度執(zhí)行
template
void gather(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
::PostQueuedCompletionStatus(m_hIocp, 0, (ULONG_PTR)coro.hd.address(), 0);
}
// 創(chuàng)建task并立即調(diào)度執(zhí)行
template
void createTask(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
coro.resume();
}
size_t taskCount() const { return m_taskCount; }
// 執(zhí)行協(xié)程調(diào)度
void run();

private:
void appendNewTaskToEnd(CoTask& cur) {
auto& cprm = cur.hd.promise();
cprm.sc = this;
if (m_end.hd) {
cprm.before = m_end.hd;
cprm.behind = 0;
m_end.hd.promise().behind = cur.hd;
}
m_end.hd = cur.hd;
++m_taskCount;
if (m_begin.hd == 0) {
m_begin.hd = cur.hd;
cprm.before = 0;
}
}
void moveTaskToEnd(CoTask::handle h) {
if (removeDoneTask())
return;
if (!h)
return;
auto& cprm = h.promise();
if (h == m_begin.hd) {
m_begin.hd = cprm.behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
else if (h == m_end.hd) {
}
else {
cprm.behind.promise().before = cprm.before;
cprm.before.promise().behind = cprm.behind;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
}
bool removeDoneTask() {
bool ret = false;
while (m_begin.hd && m_begin.hd.done()) {
auto h = m_begin.hd;
m_begin.hd = h.promise().behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
h.destroy();
--m_taskCount;
ret = true;
}
return ret;
}

HANDLE m_hIocp;
const std::thread::id m_curTid;
MainCoro m_main;
CoTask m_begin, m_end;
std::atomic m_taskCount = 0;
};

// IO文件操作類型
enum IoFileType :int {
TUnknown = 0,
TRead,
TWrite,
TListen,
TAccept,
TConnect,
TSend,
TRecv,
TSendto,
TRecvfrom,
TSleep,
};

// IO文件調(diào)度優(yōu)先級
enum IoFilePriority : int {
WaitingForPolling = 0, // 等待順序輪詢調(diào)度
DispatchImmediately, // 立即調(diào)度
};

// 支持異步掛起的可等待文件對象(基類)
template
struct IoFileAwaitable : OVERLAPPED {
operator HANDLE() const { return m_hFile; }
operator SOCKET() const { return (SOCKET)m_hFile; }
bool isRegisted() const { return m_isRegisted; }
bool isCompleted() const { return m_isCompleted; }
void setCompleted() { m_isCompleted = true; }
void resetCompleted() {
memset(this, 0, sizeof(OVERLAPPED));
m_isCompleted = 0;
}
void setReturn(Ret ret) { m_ret = ret; }
Ret getReturn() const { return m_ret; }
IoFileType& type() { return m_fileType; }
const char* typeName() const {
#define _TypeNameItem( tp ) case tp: return #tp;
switch (m_fileType)
{
_TypeNameItem(TUnknown);
_TypeNameItem(TRead);
_TypeNameItem(TWrite);
_TypeNameItem(TListen);
_TypeNameItem(TAccept);
_TypeNameItem(TConnect);
_TypeNameItem(TSend);
_TypeNameItem(TRecv);
_TypeNameItem(TSendto);
_TypeNameItem(TRecvfrom);
_TypeNameItem(TSleep);
default:
return "TUnknown";
}
}
void* getTransferredBytesCountBuffer() const {
return m_transferredBytesCount;
}
void setTransferredBytesCountRecvBuffer(void* countBuf) {
m_transferredBytesCount = countBuf;
}
bool close() {
if (m_hFile) {
return CloseHandle(detach());
}
return true;
}
HANDLE detach() {
HANDLE ret = *this;
m_hFile = 0;
m_isRegisted = 0;
return ret;
}
HANDLE attach(CoScheduler& sc, HANDLE s) {
HANDLE ret = *this;
m_hFile = s;
m_isRegisted = sc.registe(m_hFile);
return ret;
}
int getLastError() const { return m_lastError; }
void setLastError(int err) { m_lastError = err; }
CoTask::handle& onwer() { return m_owner; }
auto getPriority() const { return m_priority; }
void setPriority(IoFilePriority priority) { m_priority = priority; }
// awaitable methed
bool await_ready() const { return isCompleted(); }
void await_suspend(CoTask::handle h) {
h.promise().pAwaitableFile = this;
m_owner = h;
}
Ret await_resume() {
setTransferredBytesCountRecvBuffer(nullptr);
return getReturn();
}
protected:
IoFileAwaitable()
: m_hFile((HANDLE)0)
, m_isRegisted(false)
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, HANDLE hFile)
: m_hFile(hFile)
, m_isRegisted(sc.registe(m_hFile))
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, SOCKET sock)
: m_hFile((HANDLE)sock)
, m_isRegisted(sc.registe(sock))
{
resetCompleted();
}
HANDLE m_hFile;
bool m_isRegisted;
bool m_isCompleted;
IoFileType m_fileType = IoFileType::TUnknown;
void* m_transferredBytesCount = nullptr;
int m_lastError = ERROR_SUCCESS;
IoFilePriority m_priority = IoFilePriority::WaitingForPolling;
CoTask::handle m_owner;
Ret m_ret = 0;
};

// 支持異步掛起的套接字(基類)
template
struct AsyncSocket :public IoFileAwaitable {
using base = IoFileAwaitable;
~AsyncSocket() { close(); }
sockaddr_in localAddress() const { return m_local; }
sockaddr_in remoteAddress() const { return m_remote; }
sockaddr_in* localAddress() { return &m_local; }
sockaddr_in* remoteAddress() { return &m_remote; }
int close() {
int ret = 0;
if (base::m_hFile) {
if (base::m_hFile != (HANDLE)INVALID_SOCKET) {
ret = closesocket(detach());
}
else {
base::m_hFile = 0;
base::m_isRegisted = 0;
}
}
return ret;
}
SOCKET detach() {
return (SOCKET)base::detach();
}
SOCKET attach(CoScheduler& sc, SOCKET s) {
return (SOCKET)base::attach(sc, (HANDLE)s);
}
protected:
AsyncSocket(CoScheduler& sc, SOCKET sock)
:base(sc, sock)
{ }
sockaddr_in m_local = { 0 };
sockaddr_in m_remote = { 0 };
};

struct AsyncAcceptor;
// 支持異步掛起的服務(wù)端監(jiān)聽器,是一個等待連接到來的TCP監(jiān)聽套接字
struct AsyncListener :public AsyncSocket {
AsyncListener(CoScheduler& sc, unsigned long addr, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = addr;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
AsyncListener(CoScheduler& sc, const char* ip, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_port = htons(port);
InetPton(AF_INET, ip, &m_local.sin_addr);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
sockaddr_in listenAddress() const { return localAddress(); }
// 返回值true成功,false失敗
AsyncAcceptor& accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
};

// 支持異步掛起的TCP連接(基類)
struct AsyncTcp :public AsyncSocket {
// 返回值0成功,SOCKET_ERROR失敗
AsyncTcp& send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend);
// 返回值0成功,SOCKET_ERROR失敗
AsyncTcp& recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv);
protected:
AsyncTcp(CoScheduler& sc)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{ }
};

// 支持異步掛起的服務(wù)端接收器,是一個接受端TCP套接字
struct AsyncAcceptor : public AsyncTcp {
AsyncAcceptor(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TAccept;
}
// 解析到來連接的地址信息,保存在內(nèi)部地址變量
void perseAddress(void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer) {
if (lpAcceptBuffer == 0 || nNumberOfBytesAcceptBuffer == 0)
throw std::logic_error("perseAddress parm is invalid.");

static LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptSockAddrs = 0;
if (!lpfnGetAcceptSockAddrs) {
GUID GuidGetAcceptexSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
*this,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptexSockAddrs,
sizeof(GuidGetAcceptexSockAddrs),
&lpfnGetAcceptSockAddrs,
sizeof(lpfnGetAcceptSockAddrs),
&dwBytes, NULL, NULL))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener GetAcceptexSockAddrs error.");
}
}
int localLen = 0, remoteLen = 0;
lpfnGetAcceptSockAddrs(
lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(LPSOCKADDR*)localAddress(),
&localLen,
(LPSOCKADDR*)remoteAddress(),
&remoteLen
);
}
// 返回值true成功,false失敗
AsyncAcceptor& accept(AsyncListener& sockListener, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
int await_resume() {
setPriority(IoFilePriority::WaitingForPolling);
return AsyncTcp::await_resume();
}
};

// 支持異步掛起的用戶端連接器,是一個發(fā)起端TCP套接字
struct AsyncConnector : public AsyncTcp {
AsyncConnector(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
}
AsyncConnector(CoScheduler& sc, const char* ip, unsigned short port)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
setConnectRemoteAddress(ip, port);
bindConnectLocalPort(0);
}
void setConnectRemoteAddress(const char* ip, unsigned short port) {
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_port = htons(port);
InetPton(AF_INET, ip, &m_remote.sin_addr);
}
int bindConnectLocalPort(unsigned short port = 0) {
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = INADDR_ANY;
m_local.sin_port = htons(port);
return ::bind(*this, (const sockaddr*)&m_local, sizeof(m_local));
}
// 返回值true成功,false失敗
AsyncConnector& connect(const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失敗
AsyncConnector& connect(const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失敗
AsyncConnector& connect(void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
};

// 作為服務(wù)端Acceptor應(yīng)該具有事件完成并立即調(diào)度優(yōu)先級,保證吞吐量
// 返回值true成功,false失敗
inline
AsyncAcceptor&
accept(AsyncListener& sockListener, AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted) {
static LPFN_ACCEPTEX lpfnAcceptEx = 0;

sockListener.type() = IoFileType::TListen;
sockAccept.type() = IoFileType::TAccept;
sockAccept.resetCompleted();
sockAccept.setTransferredBytesCountRecvBuffer(lpNumberOfBytesAccepted);
sockAccept.setPriority(IoFilePriority::DispatchImmediately);//設(shè)置為立即調(diào)度優(yōu)先級
if (lpNumberOfBytesAccepted)
*lpNumberOfBytesAccepted = 0;

if (!lpfnAcceptEx) {
GUID GuidAcceptEx = WSAID_ACCEPTEX; // GUID,這個是識別AcceptEx函數(shù)必須的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockListener,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&lpfnAcceptEx,
sizeof(lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
lpfnAcceptEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Accept get AcceptEx function address error.");
}
}

bool ret = lpfnAcceptEx(
sockListener,
sockAccept,
(char*)lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
lpNumberOfBytesAccepted,
&sockAccept
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockAccept.setReturn(ret);
return sockAccept;
}
sockAccept.setReturn(false);
sockAccept.setCompleted();
sockAccept.setPriority(IoFilePriority::WaitingForPolling);
return sockAccept;
}

// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
static LPFN_CONNECTEX lpfnConnectEx = 0;
sockCon.type() = IoFileType::TConnect;
sockCon.resetCompleted();
if (lpdwBytesSent)
*lpdwBytesSent = 0;

if (!lpfnConnectEx) {
GUID GuidConnectEx = WSAID_CONNECTEX; // GUID,這個是識別AcceptEx函數(shù)必須的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockCon,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidConnectEx,
sizeof(GuidConnectEx),
&lpfnConnectEx,
sizeof(lpfnConnectEx),
&dwBytes, NULL, NULL))
{
lpfnConnectEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Connect get ConnectEx function address error.");
}
}
sockCon.setTransferredBytesCountRecvBuffer(lpdwBytesSent);
bool ret = lpfnConnectEx(
sockCon,
name,
namelen,
lpSendBuffer,
dwSendDataLength,
lpdwBytesSent,
&sockCon
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockCon.setReturn(ret);
return sockCon;
}

sockCon.setReturn(false);
sockCon.setCompleted();
return sockCon;
}

// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
sockCon.setConnectRemoteAddress(ip, port);
sockCon.bindConnectLocalPort(0);
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncTcp&
send(AsyncTcp& sock, const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend) {
sock.type() = IoFileType::TSend;
sock.resetCompleted();
if (lpNumberOfBytesSend)
*lpNumberOfBytesSend = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSend);
WSABUF wsaBuf{ nNumberOfBytesSendBuffer , (char*)lpSendBuffer };
auto ret = WSASend(sock, &wsaBuf, 1, lpNumberOfBytesSend, 0, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncTcp&
recv(AsyncTcp& sock, void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv) {
sock.type() = IoFileType::TRecv;
sock.resetCompleted();
if (lpNumberOfBytesRecv)
*lpNumberOfBytesRecv = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecv);
WSABUF wsaBuf{ nNumberOfBytesRecvBuffer , (char*)lpRecvBuffer };
unsigned long dwFlag = 0;
auto ret = WSARecv(sock, &wsaBuf, 1, NULL, &dwFlag, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}

// 支持異步掛起的UDP(非連接)套接字
struct AsyncUdp : public AsyncSocket {
// 設(shè)置失敗返回-1;返回1設(shè)置為廣播模式(client端),返回0則為接收端(server端)
int status() const { return m_isBroadCast; }
int* remoteLen() { return &m_remoteLen; }

protected:
//isBroadCast = true則為發(fā)送端udp(client端),使用sendTo,此時可以在sendTo階段動態(tài)指定廣播目的地址
//isBroadCast = false則為接受端udp(server端),使用recvFrom,構(gòu)造時必須指定綁定的廣播接收地址
AsyncUdp(CoScheduler& sc, bool isBroadCast = true, const char* ip = 0, unsigned short port = 0)
: AsyncSocket(sc, WSASocketW(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
setBroadCast(isBroadCast, ip, port);
}
// 設(shè)置失敗返回-1;返回1設(shè)置為廣播模式(client端),返回0則為接收端(server端)
int setBroadCast(bool isBroadCast, const char* ip, unsigned short port) {
if (*this && *this != INVALID_SOCKET)
{
m_isBroadCast = isBroadCast;
if (::setsockopt(*this, SOL_SOCKET, SO_BROADCAST, (char*)&m_isBroadCast, sizeof(m_isBroadCast)) == 0) {
if (isBroadCast) {
setBindAddress(0, 0);
setBroadcastAddress(ip, port);
}
else {
setBindAddress(ip, port);
}
return m_isBroadCast;
}
}
return m_isBroadCast = -1;
}
// 設(shè)置接收器綁定的收聽本地地址
bool setBindAddress(const char* ip, unsigned short port)
{
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp set reuse address error.");
}
if (::bind(*this, (const sockaddr*)&m_local, sizeof(sockaddr_in)))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp bind address error.");
}
return true;
}
// 設(shè)置發(fā)送要廣播到的目標地址(遠端地址)
void setBroadcastAddress(const char* ip, unsigned short port)
{
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_remote.sin_port = htons(port);
}
int m_remoteLen = 0;
int m_isBroadCast = -1;
};

// 支持異步掛起的UDP協(xié)議廣播器套接字(發(fā)送端,client端)
struct AsyncBroadcastor :public AsyncUdp {
AsyncBroadcastor(CoScheduler& sc, const char* ip = 0, unsigned short port = 0)
:AsyncUdp(sc, true, ip, port)
{
type() = IoFileType::TSendto;
}
// 發(fā)送端udp(client端)向內(nèi)部已保存的指定的廣播地址發(fā)送數(shù)據(jù)(未設(shè)置廣播地址將失敗)
// 返回值0成功,SOCKET_ERROR失敗
AsyncBroadcastor& sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
// 發(fā)送端udp(client端)向動態(tài)指定的廣播地址發(fā)送數(shù)據(jù)
// 返回值0成功,SOCKET_ERROR失敗
AsyncBroadcastor& sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
bool isValidBroadcastor() const { return status() == 1; }
using AsyncUdp::setBroadcastAddress;
};

// 支持異步掛起的UDP協(xié)議接收器套接字(接收端,server端)
struct AsyncReceiver :public AsyncUdp {
AsyncReceiver(CoScheduler& sc, const char* ip, unsigned short port)
:AsyncUdp(sc, false, ip, port)
{
type() = IoFileType::TRecvfrom;
}
// 接收端udp(server端)向綁定的本地地址獲取廣播數(shù)據(jù)
// 返回值0成功,SOCKET_ERROR失敗
AsyncReceiver& recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd);
bool isValidReceiver() const { return status() == 0; }
using AsyncUdp::setBindAddress;
};

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.type() = IoFileType::TSendto;
sock.resetCompleted();
if (lpNumberOfBytesSent)
*lpNumberOfBytesSent = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSent);
WSABUF wsaBuf{ nNumberOfBytesSentBuffer , (char*)lpSentBuffer };
auto ret = WSASendTo(sock, &wsaBuf, 1, lpNumberOfBytesSent, 0,
(const sockaddr*)sock.remoteAddress(), (int)sizeof(sockaddr_in), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const char* ip, unsigned short port,
const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.setBroadcastAddress(ip, port);
return ::sendTo(sock, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);;
}

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncReceiver&
recvFrom(AsyncReceiver& sock, void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd) {
sock.type() = IoFileType::TRecvfrom;
sock.resetCompleted();
if (lpNumberOfBytesRecvd)
*lpNumberOfBytesRecvd = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecvd);
WSABUF wsaBuf{ nNumberOfBytesRecvdBuffer , (char*)lpRecvdBuffer };
DWORD dwFlag = 0;
*sock.remoteLen() = sizeof(sockaddr_in);
auto ret = WSARecvFrom(sock, &wsaBuf, 1, NULL, &dwFlag,
(sockaddr*)sock.remoteAddress(), sock.remoteLen(), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}

struct AsyncFile : public IoFileAwaitable {
AsyncFile(CoScheduler& sc, const char* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileA(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
AsyncFile(CoScheduler& sc, const wchar_t* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileW(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
~AsyncFile() { close(); }
AsyncFile& read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead);
AsyncFile& write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten);
};

// 返回值true成功,false失敗
inline
AsyncFile&
read(AsyncFile& file, void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesRead)
*lpNumberOfBytesRead = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRead);
auto ret = ReadFile(file, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}

// 返回值true成功,false失敗
inline
AsyncFile&
write(AsyncFile& file, const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesWritten)
*lpNumberOfBytesWritten = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesWritten);
auto ret = WriteFile(file, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}

struct AsyncSleepor :public IoFileAwaitable {
AsyncSleepor(long long microOrMilliSeconds = 0, bool useMicroSeconds = false)
: microOrMilliSeconds(microOrMilliSeconds)
, useMicroSeconds(useMicroSeconds)
{
type() = IoFileType::TSleep;
start();
}
void start()
{
tp = std::chrono::steady_clock::now();
}
auto getSpendMicroSeconds() const {
constexpr auto div = std::nano::den / std::micro::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
auto getSpendMilliSeconds() const {
constexpr auto div = std::nano::den / std::milli::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
bool isCompleted() {
setReturn(useMicroSeconds ? getSpendMicroSeconds() : getSpendMilliSeconds());
return (m_isCompleted = getReturn() >= microOrMilliSeconds);
}
protected:
long long microOrMilliSeconds;
bool useMicroSeconds;
std::chrono::steady_clock::time_point tp;
};

//毫秒妙級別休眠,返回實際休眠的毫妙數(shù)
inline
AsyncSleepor
sleepFor(long long milliSeconds) {
return AsyncSleepor{ milliSeconds };
}

//微妙級別休眠,返回實際休眠的微妙數(shù)
inline
AsyncSleepor
sleepForEx(long long microSeconds) {
return AsyncSleepor{ microSeconds, true };
}

void test_coroutine_tcp_server(unsigned short serverPort = 33100, int totalClientCount = 100, bool dumpTestInfo = 0);

void test_coroutine_udp_random_broadcast(unsigned short broadCastPort = 33000, int totalClientBroadcastCount = 20, bool dumpTestInfo = 0);

#endif

#endif

實現(xiàn)文件

CLCoroutine.cpp

#include "CLCoroutine.h"

#if (defined(CLUseCorotine) && CLUseCorotine)

#include "../_cl_common/CLCommon.h"
#include "../_cl_string/CLString.h"
#include "../_cl_logger/CLLogger.h"

void CoScheduler::run() {
auto coro = [this]() ->MainCoro {
//logger.debug("nMain coro scheduler started ...");
#ifdef _WIN32
if (m_hIocp) {
CLString err;
DWORD dwMilliseconds = 0;
//logger.debug("nMain coro scheduler: Iocp loop started ...");
while (1) {
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* pOverlapped = 0;
while (GetQueuedCompletionStatus(m_hIocp, &numberOfBytesTransferred, &completionKey, &pOverlapped, dwMilliseconds))
{
if (pOverlapped) { //io完成事件
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(ERROR_SUCCESS);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = numberOfBytesTransferred;

// 根據(jù)可等待對象的優(yōu)先級,決定是否立即調(diào)度或是輪流調(diào)度讓每個任務(wù)的權(quán)重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即調(diào)度
break;
default:
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
break;
}
m_end.resume();
}
else { //新task來到,立即調(diào)度
if (numberOfBytesTransferred == 0 && completionKey) {
auto h = CoTask::handle::from_address((void*)completionKey);
moveTaskToEnd(h);
h.resume();
}
else {
auto lr = GetLastError();
logger.warning("Iocp: get status in event loop: ",err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
}
}
auto lr = GetLastError();
if (lr == WSA_WAIT_TIMEOUT) {
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
m_end.resume(); //執(zhí)行resume,此刻所有等待io均未完成不會執(zhí)行,但yeild讓渡的協(xié)程得到執(zhí)行;

}
else if(pOverlapped) {
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(lr);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = 0;
IoFileType fileType = pFile->type();
switch (fileType)
{
case TUnknown:
break;
case TRead:
case TWrite:
case TListen:
case TAccept:
case TConnect:
pFile->setReturn(false);
break;
case TSend:
case TRecv:
case TSendto:
case TRecvfrom:
pFile->setReturn(SOCKET_ERROR);
break;
case TSleep:
break;
default:
break;
}
switch (lr)
{
case ERROR_NETNAME_DELETED: //64 指定的網(wǎng)絡(luò)名不再可用
break;
case ERROR_SEM_TIMEOUT://121 信號燈超時
break;
default:
logger.error("Iocp: get status out event loop: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
break;
}
// 根據(jù)可等待對象的優(yōu)先級,決定是否立即調(diào)度或是輪流調(diào)度讓每個任務(wù)的權(quán)重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即調(diào)度
break;
default:
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
break;
}
m_end.resume();
}
else {
logger.error("Iocp: get status out event loop no completed: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
if (taskCount() == 0)
break;
}
CloseHandle(m_hIocp);
m_hIocp = 0;
//logger.debug("nMain coro scheduler: Iocp loop has done ...");
}
#endif
//logger.debug("nMain coro scheduler quit ...");
co_return;
};
m_main = coro();
m_main.hd.promise().sc = this;
m_main.hd.resume();
m_main.hd.destroy();
}

bool CoTask::resume() {
if (!hd)
return true;
else if (hd.done()) {
return false;
}
else {
auto pFile = (IoFileAwaitable<>*) hd.promise().pAwaitableFile;
if (!pFile) //第一次調(diào)度或者yield的協(xié)程
hd.resume();
else {
if (pFile->type() == IoFileType::TSleep) { //休眠調(diào)度
if (((AsyncSleepor*)pFile)->isCompleted()) {
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
else if (pFile->isCompleted()) { //io完成調(diào)度
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
return true;
}
}

#ifdef _WIN32
#else // Windows
#endif // Linux

AsyncAcceptor& AsyncListener::accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(* this, sockAccept, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}

AsyncAcceptor& AsyncAcceptor::accept(AsyncListener& sListen, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(sListen, *this, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}

AsyncConnector& AsyncConnector::connect(const sockaddr* name, int namelen, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

AsyncConnector& AsyncConnector::connect(const char* ip, unsigned short port, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, ip, port, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

AsyncConnector& AsyncConnector::connect(void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

AsyncTcp& AsyncTcp::send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend)
{
return ::send(*this, lpSendBuffer, nNumberOfBytesSendBuffer, lpNumberOfBytesSend);
}

AsyncTcp& AsyncTcp::recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv)
{
return ::recv(*this, lpRecvBuffer, nNumberOfBytesRecvBuffer, lpNumberOfBytesRecv);
}

AsyncBroadcastor& AsyncBroadcastor::sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}

AsyncBroadcastor& AsyncBroadcastor::sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, ip, port, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}

AsyncReceiver& AsyncReceiver::recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd)
{
return ::recvFrom(*this, lpRecvdBuffer, nNumberOfBytesRecvdBuffer, lpNumberOfBytesRecvd);
}

AsyncFile& AsyncFile::read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead)
{
return ::read(*this, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead);
}

AsyncFile& AsyncFile::write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten)
{
return ::write(*this, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten);
}

#include

void test_coroutine_tcp_server(unsigned short serverPort, int totalClientCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);

CoScheduler sc;
int servRun = 0;
int totals = 0;
CLTick tk;
// 服務(wù)端監(jiān)聽器task
sc.gather([&]()->CoTask {
logger.info("nTcp server coro started ...");
AsyncListener listener(sc, ADDR_ANY, serverPort);
// loop accept
std::vector acceptbuf(260);
AsyncAcceptor* pAcceptor = 0;
int servId = 0;
while (true)
{
AsyncAcceptor& acceptor = pAcceptor ? *pAcceptor : *(pAcceptor = new AsyncAcceptor(sc));
DWORD nValidAccept;
logger.debug("nServer listener accept wait ...");
bool ret = co_await listener.accept(acceptor, acceptbuf.data(), acceptbuf.size(), &nValidAccept);
if (ret) {
//create server task
acceptor.perseAddress(acceptbuf.data(), acceptbuf.size());
servRun++;
// 服務(wù)端task
sc.gather([&](AsyncAcceptor* pAcceptor, int idx) ->CoTask {
AsyncAcceptor& acp = *pAcceptor;
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::sprintf(bufSend.data(), "nHello client. this is server %d. %dst response.", idx, total);
logger.debug("nServer[%d] send wait ...", idx);
int ret = co_await acp.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
logger.debug("nServer[%d] recv wait ...", idx);
ret = co_await acp.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (nbytesRecv == 0)
break;
logger.debug("nServer[%d] recv client msg = %s", idx, bufRecv.data());
total++;
totals++;
}
logger.debug("nServer[%d] recv client close msg", idx);
delete pAcceptor;
servRun--;
}, pAcceptor, ++servId);
pAcceptor = 0;
}
}
logger.info("nTcp server coro quit.%d", GetCurrentThreadId());
});
// 客戶端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientCount) {
i++;
logger.info("nClients creater make client task %d.", i);
nClient++;
// 客戶端task
sc.gather([&](int idx)->CoTask {
AsyncConnector con(sc);
logger.debug("nClient[%d] connect wait ...", idx);
auto ret = co_await con.connect("127.0.0.1", serverPort);
if (!ret) {
logger.debug("nClinet[%d] connect server fail, %s", idx, CLString().getLastErrorString(GetLastError()));
co_return;
}
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::snprintf(bufSend.data(), bufSend.size(), "nHelle server, this is client %d: %dst request.", idx, total);
logger.debug("nClient[%d] send wait ...", idx);
auto ret = co_await con.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
if (!(ret == SOCKET_ERROR || nbytesSend == 0)) {
logger.debug("nClient[%d] recv wait ...", idx);
ret = co_await con.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0)
break;
logger.debug("nClient[%d] recv server msg = %s", idx, bufRecv.data());
}
total++;
}
logger.debug("nClient[%d] get server close msg and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 統(tǒng)計協(xié)程
sc.gather([&]()->CoTask {
auto last = totals;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000);
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totals, time, (totals - last) / (time - lastTime));
last = totals;
lastTime = time;
}
});

sc.run();

}

void test_coroutine_udp_random_broadcast(unsigned short broadCastPort, int totalClientBroadcastCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);
srand(time(0));
CoScheduler sc;
int servRun = 0;
int totalsRecvd = 0;
int totalsSendto = 0;
CLTick tk;
std::vector portList(totalClientBroadcastCount);
for (int i = 0; i < totalClientBroadcastCount; i++)portList[i] = broadCastPort + i;
// 服務(wù)端生成器
sc.gather([&]()->CoTask {
logger.info("nServers creater coro started.");
int nServer = 0;
for (int i = 0; 1; )
{
if (nServer < totalClientBroadcastCount) {
i++;
logger.info("nServers creater make server task %d.", i);
nServer++;
// 服務(wù)端task (廣播接收端)
sc.gather([&](int i)->CoTask {
logger.info("nUdp server[%d] coro started bind port = %d...", i, portList[i - 1]);
AsyncReceiver serv(sc, "127.0.0.1", portList[i - 1]);
// recv
std::vector recv(260);
int servId = 0;
int total = 1;
while (true)
{
DWORD nbytesRecv;
logger.debug("nUdp server[%d] recvfrom wait ...", i);
int ret = co_await serv.recvFrom(recv.data(), recv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0) {
CLString().getLastErrorMessageBoxExceptSucceed(WSAGetLastError());
break;
}
logger.debug("nUdp server[%d] recvfrom %dst broadcast %u bytes data, msg = %s", i, total, nbytesRecv, recv.data());
total++;
totalsRecvd++;
}
logger.info("nUdp server[%d] coro quit.%d", i, GetCurrentThreadId());
nServer--;
}, i);
}
else {
logger.debug("nServers creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nServers creater fetch run privilege again.");
}
}
logger.debug("nServers creater coro quit.");
});
// 客戶端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientBroadcastCount) {
i++;
logger.info("nClients creater make broadcastor client task %d.", i);
nClient++;
// 客戶端task (廣播發(fā)送端)
sc.gather([&](int idx)->CoTask {
AsyncBroadcastor broadcast(sc);
std::vector bufSent(260);
DWORD nbytesSent;
int total = 1;
while (1) {
auto randPort = portList[rand() % totalClientBroadcastCount];
std::snprintf(bufSent.data(), bufSent.size(),
"nHelle server, this is broadcastor %d: %dst randon broadcast to port=%d."
, idx, total, randPort);
logger.debug("nBroadcastor[%d] send wait ...", idx);
auto ret = co_await broadcast.sendTo("127.0.0.1", randPort,
bufSent.data(), std::strlen(bufSent.data()) + 1, &nbytesSent);
if (ret == SOCKET_ERROR || nbytesSent == 0) {
break;
}
logger.debug("nBroadcastor[%d] sendto server msg = %s", idx, bufSent.data());
total++;
totalsSendto++;
}
logger.debug("nBroadcastor[%d] send 0 bytes and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 統(tǒng)計協(xié)程
sc.gather([&]()->CoTask {
auto last = totalsRecvd + totalsSendto;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000); // 協(xié)程休眠3000毫秒
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totalsRecvd + totalsSendto, time, (totalsRecvd + totalsSendto - last) / (time - lastTime));
last = totalsRecvd + totalsSendto;
lastTime = time;
}
});

sc.run();
}

#endif

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4381

    瀏覽量

    64921
  • C++
    C++
    +關(guān)注

    關(guān)注

    22

    文章

    2119

    瀏覽量

    75341
  • 源代碼
    +關(guān)注

    關(guān)注

    96

    文章

    2953

    瀏覽量

    68408
  • 生成器
    +關(guān)注

    關(guān)注

    7

    文章

    322

    瀏覽量

    21906
收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關(guān)推薦
    熱點推薦

    C++20新特性解析

    C++之父都說過,C++20C++語言的一次重大變革,引入了大量的新特性。
    發(fā)表于 10-08 09:07 ?2389次閱讀

    談?wù)?b class='flag-5'>協(xié)的那些事兒

    隨著異步編程的發(fā)展以及各種并發(fā)框架的普及,協(xié)作為一種異步編程規(guī)范在各類語言中地位逐步提高。我們不單單會在自己的程序中使用協(xié)
    的頭像 發(fā)表于 01-26 11:36 ?1445次閱讀
    談?wù)?b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>的那些事兒

    關(guān)于C++ 20協(xié)最全面詳解

    花了一兩周的時間后,我想寫寫 C++20 協(xié)的基本用法,因為 C++ 的協(xié)讓我感到很奇怪,寫
    的頭像 發(fā)表于 04-12 11:10 ?1.4w次閱讀
    關(guān)于<b class='flag-5'>C</b>++ <b class='flag-5'>20</b><b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>最全面詳解

    Python后端項目的協(xié)是什么

    最近公司 Python 后端項目進行重構(gòu),整個后端邏輯基本都變更為采用“異步協(xié)的方式實現(xiàn)。看著滿屏幕經(jīng)過 async await(協(xié)
    的頭像 發(fā)表于 09-23 14:38 ?1534次閱讀

    現(xiàn)代C++20實戰(zhàn)手冊

    追其根源,C++ 為何如此受歡迎,除了它本身出色的性能,作為一種高級面向?qū)ο笳Z言,適用領(lǐng)域極其廣泛,小到嵌入式,大到分布式服務(wù)器,到處可以見到 C++ 的身影;另一個很重要的原因就是它“最近”不斷發(fā)布具有有趣功能的新語言標準,也
    的頭像 發(fā)表于 01-17 09:55 ?4187次閱讀

    詳解Linux線程、線程與異步編程、協(xié)異步

    協(xié)不是系統(tǒng)級線程,很多時候協(xié)被稱為“輕量級線程”、“微線程”、“纖(fiber)”等。簡單來說可以認為
    的頭像 發(fā)表于 03-16 15:49 ?1438次閱讀

    協(xié)的概念及協(xié)的掛起函數(shù)介紹

    協(xié)是一種輕量級的線程,它可以在單個線程中實現(xiàn)并發(fā)執(zhí)行。與線程不同,協(xié)不需要操作系統(tǒng)的上下文切換,因此可以更高效地使用系統(tǒng)資源。Kotlin 協(xié)
    的頭像 發(fā)表于 04-19 10:20 ?1196次閱讀

    C++20 modules入門

    以前一直有了解C++20的新特性,但是因為編譯器對此支持的比較少,所以很少實踐。
    的頭像 發(fā)表于 05-29 15:03 ?1355次閱讀
    <b class='flag-5'>C++20</b> modules入門

    Kotlin協(xié)實戰(zhàn)進階之筑基篇1

    協(xié)的概念在1958年就開始出現(xiàn)(比線程還早), 目前很多語言開始原生支, Java 沒有原生協(xié)但是大型公司都自己或者使用第三方來支持
    的頭像 發(fā)表于 05-30 16:24 ?1103次閱讀
    Kotlin<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b><b class='flag-5'>實戰(zhàn)</b>進階之筑基篇1

    Kotlin協(xié)實戰(zhàn)進階之筑基篇2

    協(xié)的概念在1958年就開始出現(xiàn)(比線程還早), 目前很多語言開始原生支, Java 沒有原生協(xié)但是大型公司都自己或者使用第三方來支持
    的頭像 發(fā)表于 05-30 16:25 ?1066次閱讀
    Kotlin<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b><b class='flag-5'>實戰(zhàn)</b>進階之筑基篇2

    Kotlin協(xié)實戰(zhàn)進階之筑基篇3

    協(xié)的概念在1958年就開始出現(xiàn)(比線程還早), 目前很多語言開始原生支, Java 沒有原生協(xié)但是大型公司都自己或者使用第三方來支持
    的頭像 發(fā)表于 05-30 16:26 ?987次閱讀

    C++20 modules基礎(chǔ)知識入門

    以前一直有了解C++20的新特性,但是因為編譯器對此支持的比較少,所以很少實踐。
    的頭像 發(fā)表于 06-15 11:37 ?1355次閱讀
    <b class='flag-5'>C++20</b> modules基礎(chǔ)知識入門

    C/C++協(xié)編程的相關(guān)概念和技巧

    自己的寄存器上下文和,可以在多個入口點間自由切換,而不是像傳統(tǒng)的函數(shù)調(diào)用那樣在一個入口點開始、另一個入口點結(jié)束。協(xié)的概念最早可以追溯到1963年,由Melvin Conway提出。經(jīng)過多年的發(fā)展,
    的頭像 發(fā)表于 11-09 11:34 ?1220次閱讀

    Linux線程、線程與異步編程、協(xié)異步介紹

    協(xié)不是系統(tǒng)級線程,很多時候協(xié)被稱為“輕量級線程”、“微線程”、“纖(fiber)”等。簡單來說可以認為
    的頭像 發(fā)表于 11-11 11:35 ?1638次閱讀
    Linux線程、線程與<b class='flag-5'>異步</b>編程、<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>與<b class='flag-5'>異步</b>介紹

    何選擇一個合適的協(xié)來獲得CPU執(zhí)行權(quán)

    如今雖不敢說協(xié)已經(jīng)是紅的發(fā)紫,但確實是越來越受到了大家的重視。Golang中的已經(jīng)是只有g(shù)oroutine,以至于很多go程序員是只知有協(xié),不知有線程了。就連
    的頭像 發(fā)表于 11-13 14:10 ?609次閱讀
    何選擇一個合適的<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>來獲得CPU執(zhí)行權(quán)