一.鎖
鎖出現(xiàn)的原因
臨界資源是什么: 多線程執(zhí)行流所共享的資源
鎖的作用是什么, 可以做原子操作, 在多線程中針對(duì)臨界資源的互斥訪問... 保證一個(gè)時(shí)刻只有一個(gè)線程可以持有鎖對(duì)于臨界資源做修改操作...
任何一個(gè)線程如果需要修改,向臨界資源做寫入操作都必須持有鎖,沒有持有鎖就不能對(duì)于臨界資源做寫入操作.
鎖 : 保證同一時(shí)刻只能有一個(gè)線程對(duì)于臨界資源做寫入操作 (鎖地功能)
再一個(gè)直觀地代碼引出問題,再?gòu)闹噶罴慕嵌热タ磫栴}
#include < stdio.h >
#include < stdlib.h >
#include < unistd.h >
#include < sys/types.h >
#include < pthread.h >
void* Routine(void* arg) {
int *pcount = (int*)arg;
for (int i = 0; i < 20000000; ++i) {
(*pcount)++;
}
return (void*)0;
}
int main() {
int count = 0;
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, NULL, Routine, (void*)&count);
pthread_create(&tid2, NULL, Routine, (void*)&count);
pthread_create(&tid3, NULL, Routine, (void*)&count);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
//看一看結(jié)果
printf("count: %dn", count);
return 0;
}
上述一個(gè)及其奇怪的結(jié)果,這個(gè)結(jié)果每一次運(yùn)行都可能是不一樣的,Why ? 按照我們本來的想法是每一個(gè)線程 + 20000000 結(jié)果肯定應(yīng)該是60000000呀,可以就是達(dá)不到這個(gè)值
為何? (深入匯編指令來看) 一定將過程放置到匯編指令上去看就可以理解這個(gè)過程了.
a++; 或者 a += 1; 這些操作的匯編操作是幾個(gè)步驟?
其實(shí)是三個(gè)步驟:
- 將數(shù)據(jù)從內(nèi)存讀取到寄存器中
- 在寄存器中進(jìn)行對(duì)應(yīng)的運(yùn)算
- 將數(shù)據(jù)運(yùn)算結(jié)果從寄存器寫回內(nèi)存
正常情況下,數(shù)據(jù)少,操作的線程少,問題倒是不大,想一想要是這樣的情況下,操作次數(shù)大,對(duì)齊操作的線程多,有些線程從中間切入進(jìn)來了,在運(yùn)算之后還沒寫回內(nèi)存就另外一個(gè)線程切入進(jìn)來同時(shí)對(duì)于之前的數(shù)據(jù)進(jìn)行++ 再寫回內(nèi)存, 啥效果,多次++ 操作之后結(jié)果確實(shí)一次加加操作后的結(jié)果。 這樣的操作 (術(shù)語(yǔ)叫做函數(shù)的重入) 我覺得其實(shí)就是重入到了匯編指令中間了,還沒將上一次運(yùn)算的結(jié)果寫回內(nèi)存就重新對(duì)這個(gè)內(nèi)存讀取再運(yùn)算寫入,結(jié)果肯定和正常的邏輯后的結(jié)果不一樣呀
來一幅圖片解釋一下
咋辦? 其實(shí)問題很清楚,我們只需要處理的是多條匯編指令不能讓它中間被插入其他的線程運(yùn)算. (要想自己在執(zhí)行匯編指令的時(shí)候別人不插入進(jìn)來) 將多條匯編指令綁定成為一條指令不就OK了嘛。
也就是原子操作?。。?/p>
不會(huì)原子操作?操作系統(tǒng)給咱提供了線程的 綁定方式工具呀:mutex 互斥鎖(互斥量), 自旋鎖(spinlock), 讀寫鎖(readers-writer lock) 他們也稱作悲觀鎖. 作用都是一個(gè)樣,將多個(gè)匯編指令鎖成為一條原子操作 (此處的匯編指令也相當(dāng)于如下的臨界資源)
悲觀鎖:鎖如其名,每次都悲觀地認(rèn)為其他線程也會(huì)來修改數(shù)據(jù),進(jìn)行寫入操作,所以會(huì)在取數(shù)據(jù)前先加鎖保護(hù),當(dāng)其他線程想要訪問數(shù)據(jù)時(shí),被阻塞掛起
樂觀鎖:每次取數(shù)據(jù)的時(shí)候,總是樂觀地認(rèn)為數(shù)據(jù)不會(huì)被其他線程修改,因此不上鎖。但是在更新數(shù)據(jù)前, 會(huì)判斷其他數(shù)據(jù)在更新前有沒有對(duì)數(shù)據(jù)進(jìn)行修改。
互斥鎖
最為常見使用地鎖就是互斥鎖, 也稱互斥量. mutex
特征,當(dāng)其他線程持有互斥鎖對(duì)臨界資源做寫入操作地時(shí)候,當(dāng)前線程只能掛起等待,讓出CPU,存在線程間切換工作
解釋一下存在線程間切換工作 : 當(dāng)線程試圖去獲取鎖對(duì)臨界資源做寫入操作時(shí)候,如果鎖被別的線程正在持有,該線程會(huì)保存上下文直接掛起,讓出CPU,等到鎖被釋放出來再進(jìn)行線程間切換,從新持有CPU執(zhí)行寫入操作
互斥鎖需要進(jìn)行線程間切換,相比自旋鎖而言性能會(huì)差上許多,因?yàn)樽孕i不會(huì)讓出CPU, 也就不需要進(jìn)行線程間切換的步驟,具體原理下一點(diǎn)詳述
#include < stdio.h >
#include < stdlib.h >
#include < unistd.h >
#include < sys/types.h >
#include < pthread.h >
pthread_mutex_t mtx;
void* Routine(void* arg) {
int *pcount = (int*)arg;
for (int i = 0; i < 20000000; ++i) {
pthread_mutex_lock(&mtx);
(*pcount)++;
pthread_mutex_unlock(&mtx);
}
return (void*)0;
}
int main() {
pthread_mutex_init(&mtx, NULL);
int count = 0;
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, NULL, Routine, (void*)&count);
pthread_create(&tid2, NULL, Routine, (void*)&count);
pthread_create(&tid3, NULL, Routine, (void*)&count);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
//看一看結(jié)果
printf("count: %dn", count);
pthread_mutex_destroy(&mtx);//銷毀鎖
return 0;
}
加互斥量(互斥鎖)確實(shí)可以達(dá)到要求,但是會(huì)發(fā)現(xiàn)運(yùn)行時(shí)間非常的長(zhǎng),因?yàn)榫€程間不斷地切換也需要時(shí)間, 線程間切換的代價(jià)比較大.
自旋鎖
spinlock.自旋鎖.
對(duì)比互斥量(互斥鎖)而言,獲取自旋鎖不需要進(jìn)行線程間切換,如果自旋鎖正在被別的線程占用,該線程也不會(huì)放棄CPU進(jìn)行掛起休眠,而是恰如其名的在哪里不斷地循環(huán)地查看自旋鎖保持者(持有者)是否將自旋鎖資源釋放出來... (自旋地原來就是如此)
口語(yǔ)解釋自旋:持有自旋鎖的線程不釋放自旋鎖,那也沒有關(guān)系呀,我就在這里不斷地一遍又一遍地查詢自旋鎖是否釋放出來,一旦釋放出來我立馬就可以直接使用 (因?yàn)槲也]有掛起等待,不需要像互斥鎖還需要進(jìn)行線程間切換,重新獲取CPU,保存恢復(fù)上下文等等操作)
哪正是因?yàn)樯鲜鲞@些特點(diǎn),線程嘗試獲取自旋鎖,獲取不到不會(huì)采取休眠掛起地方式,而是原地自旋(一遍又一遍查詢自旋鎖是否可以獲?。┬适沁h(yuǎn)高于互斥鎖了. 那我們是不是所有情況都使用自旋鎖就行了呢,互斥鎖就可以放棄使用了嗎????
解釋自旋鎖地弊端:如果每一個(gè)線程都僅僅只是需要短時(shí)間獲取這個(gè)鎖,那我自旋占據(jù)CPU等待是沒啥問題地。要是線程需要長(zhǎng)時(shí)間地使用占據(jù)(鎖)。。。 會(huì)造成過多地?zé)o端占據(jù)CPU資源,俗稱站著茅坑不拉屎... 但是要是僅僅是短時(shí)間地自旋,平衡CPU利用率 + 程序運(yùn)行效率 (自旋鎖確實(shí)是在有些時(shí)候更加合適)
自旋鎖需要場(chǎng)景:內(nèi)核可搶占或者SMP(多處理器)情況下才真正需求 (避免死鎖陷入死循環(huán),瘋狂地自旋,比如遞歸獲取自旋鎖. 你獲取了還要獲取,但是又沒法釋放)
自旋鎖的使用函數(shù)其實(shí)和互斥鎖幾乎是一摸一樣地,僅僅只是需要將所有的mutex換成spin即可
僅僅只是在init存在些許不同
#include < stdio.h >
#include < stdlib.h >
#include < unistd.h >
#include < sys/types.h >
#include < pthread.h >
pthread_spinlock_t mtx;
void* Routine(void* arg) {
int *pcount = (int*)arg;
for (int i = 0; i < 20000000; ++i) {
pthread_spin_lock(&mtx);
(*pcount)++;
pthread_spin_unlock(&mtx);
}
return (void*)0;
}
int main() {
pthread_spin_init(&mtx, PTHREAD_PROCESS_SHARED);
int count = 0;
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, NULL, Routine, (void*)&count);
pthread_create(&tid2, NULL, Routine, (void*)&count);
pthread_create(&tid3, NULL, Routine, (void*)&count);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
//看一看結(jié)果
printf("count: %dn", count);
pthread_spin_destroy(&mtx);//銷毀鎖
return 0;
}
- 解決上述地問題地方式二, 不是使用std=c99 而是直接將 int i 放置到for循環(huán)外面
- 讀寫鎖 + 讀者寫者模式:
- 主要是處理讀多寫少地情況,和本文后序關(guān)聯(lián)不大,需要的可自行查閱了解
二.epoll驚群?jiǎn)栴}地理解
何為驚群,池塘一堆, 我瞄準(zhǔn)一條插過去,但是好似所有的都像是覺著自己正在被插一樣的四處逃竄。 這個(gè)就是驚群的生活一點(diǎn)的理解
驚群現(xiàn)象其實(shí)一點(diǎn)也不少,比如說 accept pthread_cond_broadcast 還有多個(gè)線程共享epoll監(jiān)視一個(gè)listenfd 然后此刻 listenfd 說來 SYN了,放在了SYN隊(duì)列中,然后完成了三次握手放在了 accept隊(duì)列中了, 現(xiàn)在問題是這個(gè)connect我應(yīng)該交付給哪一個(gè)線程處理呢.
多個(gè)epoll監(jiān)視準(zhǔn)備工作的線程 就是這群 (),然后connet就是魚叉,這一叉下去肯定是所有的 epoll線程都會(huì)被驚醒 (多線程共享listenfd引發(fā)的epoll驚群)
同樣如果將上述的多個(gè)線程換成多個(gè)進(jìn)程共享監(jiān)視 同一個(gè) listenfd 就是(多進(jìn)程的epoll驚群現(xiàn)象)
咱再畫一個(gè)草圖再來理解一下這個(gè)驚群:
如果是多進(jìn)程道理是一樣滴,僅僅只是將所有的線程換成進(jìn)程就OK了
三. epoll驚群?jiǎn)栴}地解決
終是來到了今天的正題了: epoll驚群?jiǎn)栴}地解決上面了...
首先 先說說accept的驚群?jiǎn)栴},沒想到吧accept 平時(shí)大家寫它的多線程地時(shí)候,多個(gè)線程同時(shí)accept同一個(gè)listensock地時(shí)候也是會(huì)存在驚群?jiǎn)栴}地,但是accept地驚群?jiǎn)栴}已經(jīng)被Linux內(nèi)核處理了: 當(dāng)有新的連接進(jìn)入到accept隊(duì)列的時(shí)候,內(nèi)核喚醒且僅喚醒一個(gè)進(jìn)程來處理
但是對(duì)于epoll的驚群?jiǎn)栴},內(nèi)核卻沒有直接進(jìn)行處理。哪既然內(nèi)核沒有直接幫我們處理,我們應(yīng)該如何針對(duì)這種現(xiàn)象做出一定的措施呢?
驚群效應(yīng)帶來的弊端: 驚群現(xiàn)象會(huì)造成epoll的偽喚醒,本來epoll是阻塞掛起等待著地,這個(gè)時(shí)候因?yàn)閽炱鸬却遣粫?huì)占用CPU地。。。 但是一旦喚醒就會(huì)占用CPU去處理發(fā)生地IO事件, 但是其實(shí)是一個(gè)偽喚醒,這個(gè)就是對(duì)于線程或者進(jìn)程的無(wú)效調(diào)度。然而進(jìn)程或者線程地調(diào)取是需要花費(fèi)代價(jià)地,需要上下文切換。需要進(jìn)行進(jìn)程(線程)間的不斷切換... 本來多核CPU是用來支持高并發(fā)地,但是現(xiàn)在卻被用來無(wú)效地喚醒,對(duì)于多核CPU簡(jiǎn)直就是一種浪費(fèi) (浪費(fèi)系統(tǒng)資源) 還會(huì)影響系統(tǒng)的性能.
解決方式(一般是兩種)
Nginx的解決方式:
加鎖:驚群?jiǎn)栴}發(fā)生的前提是多個(gè)進(jìn)程(線程)監(jiān)聽同一個(gè)套接字(listensock)上的事件,所以我們只讓一個(gè)進(jìn)程(線程)去處理監(jiān)聽套接字就可以了。
// 是否開啟 accept 鎖,
// 開啟則需要搶鎖,以防驚群,默認(rèn)是關(guān)閉的。
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > 0) {
// ngx_accept_disabled 的值是經(jīng)過算法計(jì)算出來的,
// 當(dāng)值大于 0 時(shí),說明此進(jìn)程負(fù)載過高,不再接收新連接。
ngx_accept_disabled--;
} else {
// 嘗試搶 accept 鎖,發(fā)生錯(cuò)誤直接返回
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
// 搶到鎖,設(shè)置事件處理標(biāo)識(shí),后續(xù)事件先暫存隊(duì)列中。
flags |= NGX_POST_EVENTS;
} else {
// 未搶到鎖,修改阻塞等待時(shí)間,使得下一次搶鎖不會(huì)等待太久
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
方式2:使用 設(shè)置SO_REUSEPORT:使得端口號(hào)可以復(fù)用, 如此多個(gè)進(jìn)程或者線程便可以綁定同一個(gè)端口號(hào)了 這樣相當(dāng)于是每一個(gè)進(jìn)程或線程都監(jiān)視一個(gè)listensock
畫兩張圖來理解一下:
四、代碼演示:
#include < stdio.h >
#include < sys/epoll.h >
#include < stdlib.h >
#include < unistd.h >
#include < sys/socket.h >
#include < string.h >
#include < arpa/inet.h >
#include < pthread.h >
#include < sys/types.h >
#include < fcntl.h >
typedef struct sockaddr SA;
#define CLIENTSIZE 1000
#define BUFFSIZE 256
#define SERVE_PORT 8080
#define ERR_EXIT(m)
do { perror(m); close(EXIT_FAILURE); } while(0)
int CreateSocket() {
int listensock = socket(AF_INET, SOCK_STREAM, 0);
int reuseport = 1;
if (-1 == setsockopt(listensock, SOL_SOCKET, SO_REUSEPORT, &reuseport, sizeof(reuseport))) {
ERR_EXIT("setsocketopt");
}
struct sockaddr_in serveAdd;
//確定服務(wù)端協(xié)議地址簇
memset(&serveAdd, 0, sizeof(serveAdd));//清空
serveAdd.sin_family = AF_INET;
serveAdd.sin_addr.s_addr = htonl(INADDR_ANY);//其實(shí)就是0.0.0.0 通配地址
serveAdd.sin_port = htons(SERVE_PORT);
if (-1 == bind(listensock, (SA*)&serveAdd, sizeof(serveAdd))) {
ERR_EXIT("bind");
}
if (-1 == listen(listensock, 5)) {
ERR_EXIT("listen");
}
return listensock;
}
void setnoblock(int fd) {
int oldflag;
oldflag = fcntl(fd, F_GETFL); //獲取flag
if (-1 == fcntl(fd, F_SETFL, oldflag | O_NONBLOCK)) {
ERR_EXIT("fcnl");
}
}
//像epfd中增加監(jiān)視事件,將監(jiān)視事件掛在到紅黑樹上
void addfd(int epfd, int fd) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLIN | EPOLLERR | EPOLLET;
if (-1 == epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev)) {
ERR_EXIT("epoll_ctl");
}
setnoblock(fd);
//設(shè)置非阻塞IO,因?yàn)槭荅T
}
void delfd(int epfd, int fd) {
struct epoll_event ev;
if (-1 == epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev)) {
ERR_EXIT("epoll_ctl");
}
}
//使用多線程去演示
void* Routine(void* arg) {
struct epoll_event* evs = (struct epoll_event*)calloc(CLIENTSIZE, sizeof(struct epoll_event));
char buff[BUFFSIZE];
//每一個(gè)線程都創(chuàng)建一個(gè)新地監(jiān)視窗口,但是其實(shí)監(jiān)視在一個(gè)port上
//將問題拋給內(nèi)核處理,
int listensock = (int)arg;
int epfd = epoll_create(CLIENTSIZE);
int i;
addfd(epfd, listensock);
int count = 1; //記錄監(jiān)視IO事件地?cái)?shù)目
while (1) {//循環(huán)監(jiān)視
int nready = epoll_wait(epfd, evs, count,-1);
printf("tid: %d 線程被喚醒處理IO事件n", pthread_self());
sleep(2000);
for (i = 0; i < nready; ++i) {
if (evs[i].events & EPOLLERR) {
//處理錯(cuò)誤斷開連接等等操作
} else if ((evs[i].events & EPOLLIN) && evs[i].data.fd == listensock) {
socklen_t clientLen;
struct sockaddr_in clientAdd;
//處理accept操作
int connectsock = accept(listensock, (SA*)&clientAdd, &clientLen);
if (connectsock == -1) {
ERR_EXIT("accept");
}
printf("accept sucess and fd is %dn", connectsock);
//增加監(jiān)視事件
addfd(epfd, connectsock);
} else if (evs[i].events & EPOLLIN) {
//read
//decode
//compute
//encode
//修改成監(jiān)視寫事件
} else if (evs[i].events & EPOLLOUT) {
//write
//改成讀事件
}
}
}
free(evs); //釋放資源
}
int main() {
pthread_t tid;
int i;
//此處顯示多個(gè)線程共享一個(gè)listensock 看看效果
int listensock = CreateSocket();
for (i = 0; i < 10; ++i) { //簡(jiǎn)單地開十個(gè)線程
//int listensock = CreateSocket();
pthread_create(&tid, NULL, Routine, (void*)listensock);
pthread_detach(tid);//分離線程
}
while (1); //主線程等待子線程結(jié)束
return 0;
}
上述還沒有進(jìn)行一個(gè)每一個(gè)進(jìn)程都對(duì)應(yīng)一個(gè)listensock 而是多線程共享一個(gè)listensock 運(yùn)行結(jié)果如下
所有的線程同時(shí)被喚醒了,但是實(shí)際上會(huì)處理連接的僅僅只是一個(gè)線程,
int main() {
pthread_t tid;
int i;
//int listensock = CreateSocket();
for (i = 0; i < 10; ++i) { //簡(jiǎn)單地開十個(gè)線程
int listensock = CreateSocket();
pthread_create(&tid, NULL, Routine, (void*)listensock);
pthread_detach(tid);//分離線程
}
while (1); //主線程等待子線程結(jié)束
return 0;
}
咱僅僅只是將主線程做如上這樣一個(gè)簡(jiǎn)單的修改,每一個(gè)線程對(duì)應(yīng)一個(gè)listensock;每一個(gè)線程一個(gè)獨(dú)有的監(jiān)視窗口,將問題拋給內(nèi)核去處理,讓內(nèi)核去負(fù)載均衡 : 結(jié)果如下
僅僅喚醒一個(gè)線程來進(jìn)行處理連接,解決了驚群?jiǎn)栴}
五. 總結(jié)本章:
本文通過介紹兩種鎖入手,以及為什么需要鎖,鎖本質(zhì)就是為了保護(hù),持有鎖你就有權(quán)力有能力操作寫入一定的臨界保護(hù)資源,沒有鎖你就不行需要等待,本質(zhì)其實(shí)是將多條匯編指令綁定成原子操作
然后介紹了驚群現(xiàn)象,通過一個(gè)巧妙地例子,扔一顆石子,只是瞄準(zhǔn)一條魚扔過去了,但是整池魚都被驚醒了,
對(duì)應(yīng)我們地實(shí)際問題就是, 多個(gè)線程或者進(jìn)程共同監(jiān)視同一個(gè)listensock。。。。然后IO連接事件到來地時(shí)候本來僅僅只是需要一個(gè)線程醒過來處理即可,但是卻會(huì)使得所有地線程(進(jìn)程)全部醒過來,造成不必要地進(jìn)程線程間切換,多核CPU被浪費(fèi)喔,系統(tǒng)資源被浪費(fèi)
處理方式 一。 Nginx 源碼加互斥鎖處理。。 二。設(shè)置SO_REUSEPORT, 使得多個(gè)進(jìn)程線程可以同時(shí)連接同一個(gè)port , 為每一個(gè)進(jìn)程線程搞一個(gè)listensock... 將問題拋給內(nèi)核去處理,讓他去負(fù)載均衡地僅僅將IO連接事件分配給一個(gè)進(jìn)程或線程
-
寄存器
+關(guān)注
關(guān)注
31文章
5434瀏覽量
124543 -
Linux
+關(guān)注
關(guān)注
87文章
11511瀏覽量
213843 -
多線程
+關(guān)注
關(guān)注
0文章
279瀏覽量
20447 -
代碼
+關(guān)注
關(guān)注
30文章
4900瀏覽量
70758
發(fā)布評(píng)論請(qǐng)先 登錄
Linux讀寫鎖邏輯解析—Linux為何會(huì)引入讀寫鎖?

linux下各種格式的壓縮包的解壓方法總結(jié)
在linux下實(shí)現(xiàn)事件,主要采用條件鎖的方式實(shí)現(xiàn)
Linux下多線程編程總結(jié)
LINUX系統(tǒng)教程之如何在Linux系統(tǒng)下進(jìn)行編程
ADC的各種指標(biāo)如何理解如何提高ADC轉(zhuǎn)換精度

如何理解Linux的工作原理

Go語(yǔ)言sync包中的鎖都在什么場(chǎng)景下用
嵌入式linux報(bào)警,嵌入式Linux下LED報(bào)警燈驅(qū)動(dòng)設(shè)計(jì)及編程.doc

Linux中的傷害/等待互斥鎖介紹
介紹一下Linux內(nèi)核中的各種鎖
Linux實(shí)例:多線程和互斥鎖到底該如何使用

Linux內(nèi)核中的各種鎖介紹

評(píng)論