Rust是一種系統(tǒng)級編程語言,它帶有嚴格的內存管理、并發(fā)和安全性規(guī)則,因此很受廣大程序員的青睞。RwLock(讀寫鎖)是 Rust 中常用的線程同步機制之一,本文將詳細介紹 Rust 語言中的 RwLock 的內部實現原理、常用接口的使用技巧和最佳實踐。
RwLock 的內部實現原理
基本概念
RwLock 是一種讀寫分離的鎖,允許多個線程同時讀取共享數據,但只允許一個線程寫入數據。通過這種方式,可以避免讀寫操作之間的競爭,從而提高并發(fā)性能。
在 Rust 中,RwLock 的實現基于 std::sync::RwLock 結構體。其中,T 表示被保護的數據類型,需要滿足 Send 特質以便可以在線程之間傳遞,并且需要滿足 Sync 特質以便可以在線程之間共享。
RwLock 是在 std::sync::RwLock 結構體上實現的,為了方便說明,下文中假設 T 為 u32 類型。
RwLock 的基本結構
RwLock 的基本結構如下:
use std::sync::RwLock;
let lock = RwLock::new(0u32);
該代碼將創(chuàng)建一個 RwLock 對象,其中 T 類型為 u32,初始化值為 0,即該鎖保護的是一個名為 data 的 u32 類型變量。
RwLock 的鎖定機制
我們可以通過鎖定 RwLock 來對數據進行保護。RwLock 提供了四個方法來完成鎖定操作:
read()
方法:獲取讀鎖,并返回一個 RAII(資源獲取即初始化)的讀取守衛(wèi)。多個線程可以同時獲取讀鎖,但是不能同時持有寫鎖。
try_read()
方法:非阻塞地獲取讀鎖。如果讀鎖已經被占用,則返回 None。
write()
方法:獲取寫鎖,并返回一個 RAII 的寫入守衛(wèi)。如果有任何線程正在持有讀鎖或寫鎖,則阻塞等待直到它們釋放鎖。
try_write()
方法:非阻塞地獲取寫鎖。如果寫鎖已經被占用,則返回None
。
對于讀寫鎖,我們需要保證寫操作在讀操作之前,因此,在調用 write 方法時,會等待所有的讀取守衛(wèi)被釋放,并阻止新的讀取守衛(wèi)的創(chuàng)建。為了避免死鎖和優(yōu)先級反轉,寫入守衛(wèi)還可以降低優(yōu)先級。
讀寫鎖的實現主要是通過兩個 Mutex 來實現的。一個 Mutex 用于保護讀取計數器,另一個 Mutex 用于保護寫入狀態(tài)。讀取計數器統(tǒng)計當前存在多少個讀取鎖,每當一個新的讀取鎖被請求時,讀取計數器就會自增。當讀取計數器為 0 時,寫入鎖可以被請求。
RwLock 的 Poisoning
類似于 Mutex,RwLock 也支持 poisoning 機制。如果 RwLock 發(fā)生 panic,那么鎖就成了 poison 狀態(tài),也就是無法再被使用。任何試圖獲取這個鎖的線程都會 panic,而不是被阻塞。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let readers = (0..6)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let guard = lock.read().unwrap();
println!("read: {}", *guard);
})
})
.collect::< Vec< _ >>();
let writers = (0..2)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
})
.collect::< Vec< _ >>();
for reader in readers {
reader.join().unwrap();
}
for writer in writers {
writer.join().unwrap();
}
}
運行后,可能會出現以下異常信息:
thread 'main' panicked at 'PoisonError { inner: ...
這里的 inner
表示調用 RwLock 的線程 panic 時產生的錯誤信息。
常用接口的使用技巧
read()
方法
read()
方法用于獲取讀鎖,并返回一個 RAII 的讀取守衛(wèi):
let lock = RwLock::new(0u32);
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
在上面的例子中,r1 和 r2 都是 RwLockWriteGuard 類型的對象,它們引用的數據類型是 u32。這意味著它們只允許讀取 u32 類型的數據,并且無法改變它們的值。
讀取守衛(wèi)被析構時,RwLock 的讀取計數器會減少,如果讀取計數器變?yōu)?0,則寫入鎖可以被請求。
write()
方法
write()
方法用于獲取寫鎖,并返回一個 RAII 的寫入守衛(wèi):
let lock = RwLock::new(0u32);
let mut w1 = lock.write().unwrap();
let mut w2 = lock.write().unwrap();
在上面的例子中,w1 和 w2 都是 RwLockWriteGuard 類型的對象,它們引用的數據類型是 u32。這意味著它們允許讀寫 u32 類型的數據,并且可以改變它們的值。
寫入守衛(wèi)被析構時,寫入鎖立即被釋放,并且所有等待讀取鎖和寫入鎖的線程都可以開始運行。
try_read()
方法
try_read()
方法用于非阻塞地獲取讀鎖。如果讀鎖已經被占用,則返回 None。
let lock = RwLock::new(0u32);
if let Some(r) = lock.try_read() {
println!("read: {}", *r);
} else {
println!("read lock is already taken");
}
try_write()
方法
try_write()
方法用于非阻塞地獲取寫鎖。如果寫鎖已經被占用,則返回 None。
let lock = RwLock::new(0u32);
if let Some(mut w) = lock.try_write() {
*w += 1;
println!("write: {}", *w);
} else {
println!("write lock is already taken");
}
共享所有權
如果你想在多個線程之間共享一個 RwLock 對象,就需要使用 Arc(atomic reference counting,原子引用計數)來包裝它:
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let readers = (0..6)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let guard = lock.read().unwrap();
println!("read: {}", *guard);
})
})
.collect::< Vec< _ >>();
let writers = (0..2)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
})
.collect::< Vec< _ >>();
for reader in readers {
reader.join().unwrap();
}
for writer in writers {
writer.join().unwrap();
}
}
// 輸出結果:
// read: 0
// read: 0
// read: 0
// read: 0
// read: 0
// read: 0
// write: 1
// write: 2
實現鎖超時功能
Rust標準庫中的RwLock目前是不支持讀/寫超時功能的。我們可以利用RwLock中非阻塞方法try_read和try_write實現超時的特征。
下面進一步講解使用std::sync::RwLock和std::time::Duration來實現讀超時,具體步驟如下:
- 創(chuàng)建一個名為TimeoutRwLock的trait,其中包含read_timeout方法。
- 在TimeoutRwLock中添加默認實現(default impl)。
- 在read_timeout方法中,通過RwLock的try_read_with_timeout方法來嘗試獲取讀取器(Reader),并且指定一個等待時間。
- 如果在等待時間內成功獲取到讀取器,那么將讀取器返回;否則,返回一個錯誤。 下面是代碼實現:
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration;
use std::thread;
use std::thread::sleep;
trait TimeoutRwLock< T > {
fn read_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, String > {
match self.try_read_with_timeout(timeout) {
Ok(guard) = > Ok(guard),
Err(_) = > Err(String::from("timeout")),
}
}
fn try_read_with_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, () >;
}
impl< T > TimeoutRwLock< T > for RwLock< T > {
fn try_read_with_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, () > {
let now = std::time::Instant::now();
loop {
match self.try_read() {
Ok(guard) = > return Ok(guard),
Err(_) = > {
if now.elapsed() >= timeout {
return Err(());
}
std::thread::sleep(Duration::from_millis(10));
}
}
}
}
}
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let reader = {
let lock = lock.clone();
thread::spawn(
move || match lock.read_timeout(Duration::from_millis(100)) {
Ok(guard) = > {
println!("read: {}", *guard);
}
Err(e) = > {
println!("error: {:?}", e);
}
},
)
};
let writer = {
let lock = lock.clone();
thread::spawn(move || {
sleep(Duration::from_secs(1));
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
};
reader.join().unwrap();
writer.join().unwrap();
}
// 輸出結果:
// read: 0
// write: 1
在這個實現中,trait TimeoutRwLock中定義了一個read_timeout方法,它與try_read方法具有相同的輸入參數類型和輸出類型。default impl方法是一個嘗試在給定的等待時間內獲取讀取器(Reader)的循環(huán),并在等待過程中使用線程(thread)的park_timeout方法來避免 CPU 占用過高。如果在等待時間內成功獲取到讀取器(Reader),則返回讀取器;否則返回一個錯誤。
當然,除了自己實現Trait外,還可以使用成熟的第三方庫,例如:parking_lot
RwLock最佳實踐
- ? 避免使用鎖
鎖是一種解決并發(fā)問題的基本機制,但由于鎖會引入競爭條件、死鎖和其他問題,因此應盡量避免使用鎖。如果可能,應使用更高級別的機制,例如 Rust 的通道(channel)。
- ? 避免過度使用讀寫鎖
在某些情況下,讀寫鎖可能會比互斥鎖更慢。例如,如果有太多的讀取器,并且它們在擁有讀取鎖時花費了大量時間,那么寫入器的等待時間可能會很長。因此,使用讀寫鎖時,應仔細考慮讀寫比例,以避免過度使用讀寫鎖。
- ? 鎖的可重入性
RwLock 是可重入的;一個線程占有寫鎖時可以再次占有讀鎖,并且同樣可以占有寫鎖。但這種情況要非常小心,因為可能會導致死鎖。
- ? 盡量縮小鎖的范圍
鎖的范圍越小,競爭就越少,性能就越好。因此,應盡量在需要的地方使用鎖,而在不需要的地方釋放鎖。例如,在讀寫數據之前,可以先將數據復制到本地變量中,然后釋放鎖,以便其它線程可以訪問該數據,而不必爭奪鎖。在本地變量上執(zhí)行讀寫操作時,不需要鎖定。
- ? 鎖的超時設置
在使用鎖時,應該避免出現無限等待的情況??梢允褂脦С瑫r的鎖,當等待時間超過指定的時間時,會返回一個錯誤。這將防止出現死鎖或其他問題。
// 引入第三方庫處理超時
// parking_lot = "0.12.1"
use parking_lot::RwLock;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let rwlock = Arc::new(RwLock::new(0));
let start = Instant::now();
// 嘗試在 1 秒內獲取讀鎖
let reader = loop {
if let Some(r) = rwlock.try_read_for(Duration::from_secs(1)) {
break r;
}
if start.elapsed() >= Duration::from_secs(5) {
panic!("Failed to acquire read lock within 5 seconds.");
}
};
// 嘗試在 1 秒內獲取寫鎖
let mut writer = loop {
if let Some(w) = rwlock.try_write_for(Duration::from_secs(1)) {
break w;
}
if start.elapsed() >= Duration::from_secs(5) {
panic!("Failed to acquire write lock within 5 seconds.");
}
};
// 進行讀寫操作
println!("Reader: {}", *reader);
*writer += 1;
println!("Writer: {}", *writer);
}
在上面的例子中,讀取器等待 100 毫秒后超時,寫入器等待 1 秒鐘才能成功完成寫入。
總結
RwLock 是 Rust 中一種常用的線程同步機制,可以提高程序的并發(fā)性能。它只允許一個線程寫入數據,但可以讓多個線程同時讀取同一個數據。具體來說,RwLock 在實現上使用了兩個 Mutex,一個用于保護讀取計數器,另一個用于保護寫入狀態(tài)。在使用 RwLock 時,應該注意縮小鎖的范圍、避免使用過多讀寫鎖以及防止死鎖等問題。
-
編程語言
+關注
關注
10文章
1956瀏覽量
36694 -
程序
+關注
關注
117文章
3826瀏覽量
83018 -
內存管理
+關注
關注
0文章
168瀏覽量
14573 -
rust語言
+關注
關注
0文章
57瀏覽量
3150
發(fā)布評論請先 登錄
評論