01、簡介
Xline 是一款開源的分布式 KV 存儲(chǔ)引擎,用于管理少量的關(guān)鍵性數(shù)據(jù),其核心目標(biāo)是實(shí)現(xiàn)高性能的數(shù)據(jù)訪問,以及保證跨數(shù)據(jù)中心場景下的強(qiáng)一致性。 Xline 對(duì)外提供了一系列兼容 etcd 的訪問接口,比如 KV、Watch、Lease 等等。本文將會(huì)著重介紹一下其中的 Lease 接口 。
Lease 是一種客戶端和服務(wù)端之間的租約機(jī)制。類似于我們現(xiàn)實(shí)生活中的租車服務(wù),當(dāng)我們需要使用一輛車時(shí),我們可以向租車公司申請一個(gè) lease,租車公司會(huì)給我們分配一輛車,并且保證在我們和租車公司約定的有效期內(nèi)不會(huì)把這輛車分配給其他人,如果我們想要延長使用時(shí)間,我們可以向租車公司續(xù)租,如果我們不再需要使用這輛車,我們可以主動(dòng)歸還并取消,或者等待 lease 過期后自動(dòng)歸還。
在 Xline 中對(duì) lease 的使用和現(xiàn)實(shí)生活中的租車服務(wù)很相似,客戶端可以向服務(wù)點(diǎn)申請一個(gè) lease,服務(wù)端會(huì)保證在 lease 的有效期內(nèi)不會(huì)刪除這個(gè) lease,客戶端也可以通過相應(yīng)的接口提前結(jié)束或者延長 lease 的時(shí)間,與現(xiàn)實(shí)中租車不同的是,我們可以在這個(gè) lease 上綁定一些 key-value,這些 key-value 會(huì)隨著 lease 的過期被刪除。
根據(jù)以上介紹的 lease 的能力,我們可以在很多場景下使用 lease 來實(shí)現(xiàn)我們的目的,以下是幾個(gè)常見的 lease 應(yīng)用場景:
- 分布式鎖: 分布式鎖是通過多個(gè)機(jī)制一同實(shí)現(xiàn)的,lease 在分布式鎖中起到避免死鎖的作用。客戶端在請求分布式鎖的時(shí)候,會(huì)創(chuàng)建一個(gè) lease 并不斷續(xù)租,并且寫入 key-value 并附加該 lease,這個(gè) key-value 代表分布式鎖的占用狀態(tài),如果占用該鎖的客戶端因故障無法主動(dòng)釋放鎖,lease 機(jī)制也會(huì)保證在 lease 過期后自動(dòng)刪除對(duì)應(yīng)的 key-value 來釋放當(dāng)前鎖。
- 服務(wù)注冊中心: 注冊新服務(wù)時(shí)創(chuàng)建 lease,并寫入服務(wù)相關(guān)信息的 key-value 附加該 lease,在服務(wù)存活期間,對(duì)應(yīng)服務(wù)會(huì)一直對(duì)其 lease 續(xù)租,服務(wù)故障后無法自動(dòng)續(xù)租,對(duì)應(yīng) key-value 自動(dòng)刪除,相應(yīng)的服務(wù)就會(huì)在注冊中心中注銷。
- 分布式系統(tǒng)中的授權(quán)管理: 客戶端通過申請 lease 來獲取資源的訪問權(quán)限,如果客戶端失去與服務(wù)端的連接,或者由于故障沒有及時(shí)續(xù)租,導(dǎo)致 lease 過期,該客戶端就會(huì)失去相應(yīng)的權(quán)限
02、架構(gòu)
上圖是一個(gè) lease 實(shí)現(xiàn)的簡單架構(gòu)圖,外部 Client 可以通過兩種方式向Xline集群發(fā)送請求,一種是直接通過 Curp
協(xié)議向集群內(nèi)所有節(jié)點(diǎn)廣播請求,Curp
模塊達(dá)成共識(shí)后,會(huì)把這個(gè)請求應(yīng)用到狀態(tài)機(jī),也就是將其寫入存儲(chǔ)層;另一種發(fā)送請求的方式就是 Client 直接將請求發(fā)送到集群中一個(gè)節(jié)點(diǎn)的 LeaseServer
,這也是與 etcd 兼容的請求方式,請求到達(dá) LeaseServer
后,會(huì)有兩條不同的處理路徑,多數(shù)請求會(huì)通過 Server 端綁定的 Curp client 廣播給集群中所有節(jié)點(diǎn),剩下的少部分請求可能只有部分節(jié)點(diǎn)能夠處理,這些請求就會(huì)被轉(zhuǎn)發(fā)到這些節(jié)點(diǎn)的 LeaseServer
,然后應(yīng)用到狀態(tài)機(jī)。
03、源碼分析
源碼組織
Lease 相關(guān)的源碼主要保存在以下文件中,大致分為三個(gè)部分:
- RPC 定義:
xlineapi/proto/rpc.proto
:Xline 內(nèi)各 Server 的 rpc 接口定義,包括 LeaseServer接口定義。xlineapi/proto/lease.proto
:lease 的 rpc message 定義。
- LeaseServer實(shí)現(xiàn):
xline/src/server/lease_server.rs
:負(fù)責(zé)提供 Lease RPC service 的具體實(shí)現(xiàn),主要目的是提供 etcd 兼容接口,如果使用外部的 curp client 直接發(fā)送 propose 可以不經(jīng)過此接口,但也有部分不經(jīng)過共識(shí)協(xié)議的請求必須通過 LeaseServer 處理。
LeaseStore實(shí)現(xiàn):xline/src/storage/lease_store/lease.rs
:定義了Lease
數(shù)據(jù)結(jié)構(gòu),用于保存 Lease相關(guān)的信息,比如 Lease 上綁定的所有 Key, Lease 的過期時(shí)間,Lease 的剩余 TTL 長度等。并為其實(shí)現(xiàn)了一些實(shí)用的方法。xline/src/storage/lease_store/lease_queue.rs
:定義了LeaseQueue
和相關(guān)的方法,LeaseQueue
是一個(gè)由 lease id 以及 lease 過期時(shí)間組成的優(yōu)先隊(duì)列,一個(gè)后臺(tái)常駐 task 會(huì)定時(shí)通過此結(jié)構(gòu)獲取所有過期 lease 的 id。xline/src/storage/lease_store/lease_collection.rs
:定義了LeaseCollection
和相關(guān)的方法,LeasCollection
是 lease 核心數(shù)據(jù)結(jié)構(gòu)的集合,提供 lease 機(jī)制的核心能力。結(jié)構(gòu)內(nèi)部主要包含三個(gè)部分,lease_map
保存所有 lease 結(jié)構(gòu);item_map
緩存 key 到 lease id 映射;expired_queue
管理 lease 過期時(shí)間,expired_queue
只在 leader 節(jié)點(diǎn)上有意義,其它節(jié)點(diǎn)上為空。xline/src/storage/lease_store/mod.rs
:LeaseStore
的定義及方法實(shí)現(xiàn)。負(fù)責(zé)提供 lease 的存儲(chǔ)層抽象,對(duì)外提供所有 lease 相關(guān)操作的存儲(chǔ)層接口。其內(nèi)部包含LeaseCollection
以及和KvStore
共享的一些數(shù)據(jù)結(jié)構(gòu)。
Lease 的創(chuàng)建
想要使用 lease,首先就要?jiǎng)?chuàng)建一個(gè) lease,創(chuàng)建 lease 時(shí)需要使用 LeaseServer
提供的 LeaseGrant
接口。LeaseServer
中對(duì) LeaseGrant
的處理很簡單,就是分配一個(gè) lease id,然后通過 propose 把請求交給共識(shí)協(xié)議處理,達(dá)成共識(shí)后,請求會(huì)在 LeaseStore
中被執(zhí)行。
LeaseStore
會(huì)在 LeaseCollection
中創(chuàng)建并插入一個(gè)新的 Lease
,其核心代碼邏輯如下:
...if is_leader { let expiry = lease.refresh(Duration::ZERO); let _ignore = inner.expired_queue.insert(lease_id, expiry);} else { lease.forever();}let _ignore = inner.lease_map.insert(lease_id, lease.clone());...
需要注意的是,如果當(dāng)前節(jié)點(diǎn)是 leader 節(jié)點(diǎn)的話,還需要承擔(dān)管理 lease 過期時(shí)間的任務(wù),所以需要通過refresh
方法計(jì)算 Lease
的過期時(shí)間,并將其插入到 expired_queue
中。其他節(jié)點(diǎn)則不需要這一步處理,只需要將新的 Lease
插入到 lease_map
中。計(jì)算過期時(shí)間使用的 refresh
定義如下:
Lease 創(chuàng)建完成后,服務(wù)端會(huì)給客戶端返回一個(gè)包含 lease id 的響應(yīng)。
Lease的使用
獲取到 lease id 后,客戶端就可以通過 lease id 來使用這個(gè) lease,在 Put 一對(duì) key value 時(shí)可以附加 lease id,這個(gè) Put 請求被應(yīng)用到狀態(tài)機(jī)時(shí),除了直接在 KvStore
的 Index
和 DB
中寫入 key-value 以外,還會(huì)通過LeaseCollection
提供的 detach
方法分離當(dāng)前 key 和舊的 lease ,并通過 attach
將需要 put 的 key 附加到新的 lease id 上。
pub(crate) fn attach(&self, lease_id: i64, key: Vec< u8 >) - > Result< (), ExecuteError > { let mut inner = self.inner.write(); let Some(lease) = inner.lease_map.get_mut(&lease_id) else { return Err(ExecuteError::lease_not_found(lease_id)); }; lease.insert_key(key.clone()); let _ignore = inner.item_map.insert(key, lease_id); Ok(())}
attach
的具體實(shí)現(xiàn)就是通過 lease id
找到對(duì)應(yīng)的 Lease
,并將 key 附加到 Lease
上,以及在 item_map
中添加 key 到 lease id 的映射關(guān)系。detach
的實(shí)現(xiàn)與 attach
的相反,它會(huì)移除 attach
時(shí)插入的內(nèi)容。
經(jīng)過以上的過程,我們已經(jīng)成功將 key 和 lease id 關(guān)聯(lián)在一起,此時(shí)如果這個(gè) Lease
被主動(dòng) revoke 或者超時(shí),那么這個(gè) Lease
以及它關(guān)聯(lián)的所有 key,都會(huì)被刪除。
Lease 的主動(dòng)刪除
刪除一個(gè) lease 需要調(diào)用 LeaseRevoke
接口,這個(gè)接口在 LeaseServer
中的處理與 LeaseGrant
基本相同,都是將請求交給共識(shí)協(xié)議處理,唯一的不同是 LeaseRevoke
不需要分配 lease id。
let del_keys = match self.lease_collection.look_up(req.id) { Some(l) = > l.keys(), None = > return Err(ExecuteError::lease_not_found(req.id)),};if del_keys.is_empty() { let _ignore = self.lease_collection.revoke(req.id); return Ok(Vec::new());}// delete keys ...let _ignore = self.lease_collection.revoke(req.id);
LeaseRevoke
被執(zhí)行時(shí),首先會(huì)嘗試查找 Lease
是否有關(guān)聯(lián)的 key,如果沒有,那么就可以直接通過 LeaseCollection
上的 revoke
方法將 Lease
刪除,如果有關(guān)聯(lián)的 key 的話那么就需要將關(guān)聯(lián)的所有 key 從 KvStore
中刪除,并清理 LeaseCollection
中這些 key 和 lease id 的關(guān)系,然后才能從 LeaseCollection
中 reovke
這個(gè) Lease
。
Lease 的過期
Lease 過期時(shí)的處理流程如上圖所示,此處省略了共識(shí)的部分,在初始化 LeaseServer
時(shí),會(huì)創(chuàng)建一個(gè)后臺(tái)常駐的 revoke_expired_leases_task
,這個(gè) task 的主體代碼如下:
loop { // only leader will check expired lease if lease_server.lease_storage.is_primary() { for id in lease_server.lease_storage.find_expired_leases() { let _handle = tokio::spawn({ let s = Arc::clone(&lease_server); async move { let request = tonic::Request::new(LeaseRevokeRequest { id }); if let Err(e) = s.lease_revoke(request).await { warn!("Failed to revoke expired leases: {}", e); } } }); } } time::sleep(DEFAULT_LEASE_REQUEST_TIME).await;}
在負(fù)責(zé)管理 Lease
過期時(shí)間節(jié)點(diǎn)上,這個(gè) task 會(huì)定時(shí)通過 find_expired_leases
獲取已經(jīng)過期的所有 lease id, 然后調(diào)用 lease server 上的 lease_revoke
接口來刪除過期的 Lease
,這個(gè)接口和客戶度主動(dòng)刪除 Lease
時(shí)使用的是同一個(gè)接口。
find_expired_leases
是 LeaseCollection
上一個(gè)核心方法,具體實(shí)現(xiàn)如下:
pub(crate) fn find_expired_leases(&self) - > Vec< i64 > { let mut expired_leases = vec![]; let mut inner = self.inner.write(); while let Some(expiry) = inner.expired_queue.peek() { if *expiry <= Instant::now() { #[allow(clippy::unwrap_used)] // queue.peek() returns Some let id = inner.expired_queue.pop().unwrap(); if inner.lease_map.contains_key(&id) { expired_leases.push(id); } } else { break; } } expired_leases}
在創(chuàng)建 Lease
時(shí),我們已經(jīng)計(jì)算過了Lease
過期的時(shí)間并將其插入了 expired_queue
,調(diào)用 find_expired_queue
時(shí)會(huì)一直嘗試從優(yōu)先隊(duì)列隊(duì)頭拿出已經(jīng)過期的 Lease
,直到遇到第一個(gè)不過期的 Lease
后停止嘗試,然后將拿到的所有 lease id 返回。
Lease 的續(xù)租
如果想要讓創(chuàng)建的 Lease
能夠持續(xù)更長時(shí)間,那就需要在客戶端和服務(wù)端之間維護(hù)一條 stream,客戶端定時(shí)向服務(wù)端發(fā)送 LeaseKeepAlive
請求。和前面提到的請求不同,LeaseKeepAlive
請求不需要經(jīng)過共識(shí)協(xié)議,因?yàn)檫@個(gè)請求依賴只存在于 leader 節(jié)點(diǎn)上的 Lease
過期時(shí)間,因此只有 leader 節(jié)點(diǎn)能夠處理 LeaseKeepAlive 請求,follower 節(jié)點(diǎn)會(huì)把請求轉(zhuǎn)發(fā)至 leader 節(jié)點(diǎn)上處理。具體的轉(zhuǎn)發(fā)邏輯可以參考 lease_server.rs
內(nèi)的源碼。
在 leader 和 client 建立起 stream 后,每當(dāng) leader 從 stream 中收到 lease id,都會(huì)為這個(gè) lease 續(xù)租,最終續(xù)租的邏輯是通過 LeaseCollection
提供的 renew
方法實(shí)現(xiàn)的。該方法定義如下:
pub(crate) fn renew(&self, lease_id: i64) - > Result< i64, ExecuteError > { let mut inner = self.inner.write(); let (expiry, ttl) = { let Some(lease) = inner.lease_map.get_mut(&lease_id) else { return Err(ExecuteError::lease_not_found(lease_id)); }; if lease.expired() { return Err(ExecuteError::lease_expired(lease_id)); } let expiry = lease.refresh(Duration::default()); let ttl = lease.ttl().as_secs().cast(); (expiry, ttl) }; let _ignore = inner.expired_queue.update(lease_id, expiry); Ok(ttl)}
Renew 會(huì)先檢查對(duì)應(yīng) Lease
是否已經(jīng)過期,沒有過期的話就會(huì)重新計(jì)算過期時(shí)間,然后更新它在 expired_queue
中的順序。
只要 client 和 server 之間的連接不中斷,client 就會(huì)一直通過 stream 向服務(wù)端發(fā)送 LeaseKeepAlive
請求,這個(gè) lease 也就不會(huì)超時(shí),前文提到的 lease 主要的應(yīng)用場景中,幾乎都用到了這個(gè)特性來判斷客戶端是否在正常運(yùn)行。
Lease 信息的讀取
Lease 有兩個(gè)讀取接口,一個(gè)是 LeaseTimeToLive
,這個(gè)接口會(huì)讀取一個(gè) lease 的詳細(xì)信息,包括它的過期時(shí)間,和 LeaseKeepAlive
一樣,因?yàn)檫^期時(shí)間只存在于 leader 節(jié)點(diǎn),因此該請求需要轉(zhuǎn)發(fā)只 leader 處理;另一個(gè)讀取接口是 LeaseLeases
,這個(gè)接口會(huì)列出系統(tǒng)中所有的 lease id,這個(gè)接口不需要 lease 過期時(shí)間的信息,因此可以直接交給共識(shí)協(xié)議處理,所以在 LeaseServer
中的處理和 LeaseGrant
、LeaseRevoke
相似。此處不再贅述。
LeaseTimeToLive
和 LeaseLeases
讀取信息的能力最終由 LeaseCollection
實(shí)現(xiàn),源碼如下:
pub(crate) fn look_up(&self, lease_id: i64) - > Option< Lease > { self.inner.read().lease_map.get(&lease_id).cloned()} pub(crate) fn leases(&self) - > Vec< Lease > { let mut leases = self .inner .read() .lease_map .values() .cloned() .collect::< Vec< _ >>(); leases.sort_by_key(Lease::remaining); leases}
04、總結(jié)
本文介紹了 Xline 下的一個(gè)重要接口 Lease,用戶可以通過 Lease 實(shí)現(xiàn)一組 key 的定時(shí)過期,并且能夠通過 KeepAlive 接口為 Lease 續(xù)租,服務(wù)端也能夠根據(jù)此特性探測客戶端是否在正常運(yùn)作。依賴于 Lease 機(jī)制的這些特點(diǎn),也誕生出了很多典型的應(yīng)用場景,比如本文介紹過的分布式鎖、服務(wù)注冊中心,授權(quán)管理等等。
-
RPC
+關(guān)注
關(guān)注
0文章
111瀏覽量
11887
發(fā)布評(píng)論請先 登錄
Faster Transformer v2.1版本源碼解讀

OneFlow Softmax算子源碼解讀之WarpSoftmax

OneFlow Softmax算子源碼解讀之BlockSoftmax

聊聊Dubbo - Dubbo可擴(kuò)展機(jī)制源碼解析
直播系統(tǒng)源碼選擇二開的好處是什么
闡述FreeRTOS系統(tǒng)中機(jī)制的實(shí)現(xiàn)原理
AP側(cè)中網(wǎng)相關(guān)的PLMN業(yè)務(wù)源碼流程解讀
風(fēng)河:用“商業(yè)機(jī)制”保護(hù)開放源碼的價(jià)值
OC的消息轉(zhuǎn)發(fā)機(jī)制的深度解讀
基于EAIDK的人臉?biāo)惴☉?yīng)用-源碼解讀(2)
openharmony源碼解讀
Xline源碼解讀(一)—初識(shí)CURP協(xié)議

Xline源碼解讀(三)—CURP Server的實(shí)現(xiàn)

分布式系統(tǒng)中Membership Change 源碼解讀

評(píng)論