一区二区三区三上|欧美在线视频五区|国产午夜无码在线观看视频|亚洲国产裸体网站|无码成年人影视|亚洲AV亚洲AV|成人开心激情五月|欧美性爱内射视频|超碰人人干人人上|一区二区无码三区亚洲人区久久精品

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

redis數(shù)據(jù)傾斜的原因以及應(yīng)對方案 JD開源hotkey的源碼解析

我快閉嘴 ? 來源:京東云開發(fā)者 ? 作者:h1654155202.6723 ? 2022-09-29 10:35 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

1 前言

之前旁邊的小伙伴問我熱點(diǎn)數(shù)據(jù)相關(guān)問題,在給他粗略的講解一波redis數(shù)據(jù)傾斜的案例之后,自己也順道回顧了一些關(guān)于熱點(diǎn)數(shù)據(jù)處理的方法論,同時(shí)也想起去年所學(xué)習(xí)JD開源項(xiàng)目hotkey——專門用來解決熱點(diǎn)數(shù)據(jù)問題的框架。在這里結(jié)合兩者所關(guān)聯(lián)到的知識(shí)點(diǎn),通過幾個(gè)小圖和部分粗略的講解,來讓大家了解相關(guān)方法論以及hotkey的源碼解析。

2 Redis數(shù)據(jù)傾斜

2.1 定義與危害

先說說數(shù)據(jù)傾斜的定義,借用百度詞條的解釋:
對于集群系統(tǒng),一般緩存是分布式的,即不同節(jié)點(diǎn)負(fù)責(zé)一定范圍的緩存數(shù)據(jù)。我們把緩存數(shù)據(jù)分散度不夠,導(dǎo)致大量的緩存數(shù)據(jù)集中到了一臺(tái)或者幾臺(tái)服務(wù)節(jié)點(diǎn)上,稱為數(shù)據(jù)傾斜。一般來說數(shù)據(jù)傾斜是由于負(fù)載均衡實(shí)施的效果不好引起的。
從上面的定義中可以得知,數(shù)據(jù)傾斜的原因一般是因?yàn)長B的效果不好,導(dǎo)致部分節(jié)點(diǎn)數(shù)據(jù)量非常集中。

那這又會(huì)有什么危害呢?
如果發(fā)生了數(shù)據(jù)傾斜,那么保存了大量數(shù)據(jù),或者是保存了熱點(diǎn)數(shù)據(jù)的實(shí)例的處理壓力就會(huì)增大,速度變慢,甚至還可能會(huì)引起這個(gè)實(shí)例的內(nèi)存資源耗盡,從而崩潰。這是我們在應(yīng)用切片集群時(shí)要避免的。

2.2 數(shù)據(jù)傾斜的分類

2.2.1 數(shù)據(jù)量傾斜(寫入傾斜)

1.圖示

52a07fce-3f2c-11ed-9e49-dac502259ad0.png

如圖,在某些情況下,實(shí)例上的數(shù)據(jù)分布不均衡,某個(gè)實(shí)例上的數(shù)據(jù)特別多。

2.bigkey導(dǎo)致傾斜

某個(gè)實(shí)例上正好保存了 bigkey。bigkey 的 value 值很大(String 類型),或者是 bigkey 保存了大量集合元素(集合類型),會(huì)導(dǎo)致這個(gè)實(shí)例的數(shù)據(jù)量增加,內(nèi)存資源消耗也相應(yīng)增加。

應(yīng)對方法

在業(yè)務(wù)層生成數(shù)據(jù)時(shí),要盡量避免把過多的數(shù)據(jù)保存在同一個(gè)鍵值對中。

如果 bigkey 正好是集合類型,還有一個(gè)方法,就是把 bigkey 拆分成很多個(gè)小的集合類型數(shù)據(jù),分散保存在不同的實(shí)例上。

3.Slot分配不均導(dǎo)致傾斜

先簡單的介紹一下slot的概念,slot其實(shí)全名是Hash Slot(哈希槽),在Redis Cluster切片集群中一共有16384 個(gè) Slot,這些哈希槽類似于數(shù)據(jù)分區(qū),每個(gè)鍵值對都會(huì)根據(jù)它的 key,被映射到一個(gè)哈希槽中。Redis Cluster 方案采用哈希槽來處理數(shù)據(jù)和實(shí)例之間的映射關(guān)系。

一張圖來解釋,數(shù)據(jù)、哈希槽、實(shí)例這三者的映射分布情況。

52c6cbde-3f2c-11ed-9e49-dac502259ad0.png

這里的CRC16(city)%16384可以簡單的理解為將key1根據(jù)CRC16算法取hash值然后對slot個(gè)數(shù)取模,得到的就是slot位置為14484,他所對應(yīng)的實(shí)例節(jié)點(diǎn)是第三個(gè)。
運(yùn)維在構(gòu)建切片集群時(shí)候,需要手動(dòng)分配哈希槽,并且把16384 個(gè)槽都分配完,否則 Redis 集群無法正常工作。由于是手動(dòng)分配,則可能會(huì)導(dǎo)致部分實(shí)例所分配的slot過多,導(dǎo)致數(shù)據(jù)傾斜。

應(yīng)對方法
使用CLUSTER SLOTS 命令來查看slot分配情況,使用CLUSTER SETSLOT,CLUSTER GETKEYSINSLOT,MIGRATE這三個(gè)命令來進(jìn)行slot數(shù)據(jù)的遷移,具體內(nèi)容不再這里細(xì)說,感興趣的同學(xué)可以自行學(xué)習(xí)一下。

4.Hash Tag導(dǎo)致傾斜

Hash Tag 定義 :指當(dāng)一個(gè)key包含 {} 的時(shí)候,就不對整個(gè)key做hash,而僅對 {} 包括的字符串做hash。

假設(shè)hash算法為sha1。對user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1)。

Hash Tag 優(yōu)勢 :如果不同 key 的 Hash Tag 內(nèi)容都是一樣的,那么,這些 key 對應(yīng)的數(shù)據(jù)會(huì)被映射到同一個(gè) Slot 中,同時(shí)會(huì)被分配到同一個(gè)實(shí)例上。

Hash Tag 劣勢 :如果不合理使用,會(huì)導(dǎo)致大量的數(shù)據(jù)可能被集中到一個(gè)實(shí)例上發(fā)生數(shù)據(jù)傾斜,集群中的負(fù)載不均衡。

2.2.2 數(shù)據(jù)訪問傾斜(讀取傾斜-熱key問題)

一般來說數(shù)據(jù)訪問傾斜就是熱key問題導(dǎo)致的,如何處理redis熱key問題也是面試中常會(huì)問到的。所以了解相關(guān)概念及方法論也是不可或缺的一環(huán)。

1.圖示

52f2d51c-3f2c-11ed-9e49-dac502259ad0.png

如圖,雖然每個(gè)集群實(shí)例上的數(shù)據(jù)量相差不大,但是某個(gè)實(shí)例上的數(shù)據(jù)是熱點(diǎn)數(shù)據(jù),被訪問得非常頻繁。
但是為啥會(huì)有熱點(diǎn)數(shù)據(jù)的產(chǎn)生呢?

2.產(chǎn)生熱key的原因及危害

1)用戶消費(fèi)的數(shù)據(jù)遠(yuǎn)大于生產(chǎn)的數(shù)據(jù)(熱賣商品、熱點(diǎn)新聞、熱點(diǎn)評論、明星直播)。
在日常工作生活中一些突發(fā)的的事件,例如:雙十一期間某些熱門商品的降價(jià)促銷,當(dāng)這其中的某一件商品被數(shù)萬次點(diǎn)擊瀏覽或者購買時(shí),會(huì)形成一個(gè)較大的需求量,這種情況下就會(huì)造成熱點(diǎn)問題。
同理,被大量刊發(fā)、瀏覽的熱點(diǎn)新聞、熱點(diǎn)評論、明星直播等,這些典型的讀多寫少的場景也會(huì)產(chǎn)生熱點(diǎn)問題。
2)請求分片集中,超過單 Server 的性能極限。
在服務(wù)端讀數(shù)據(jù)進(jìn)行訪問時(shí),往往會(huì)對數(shù)據(jù)進(jìn)行分片切分,此過程中會(huì)在某一主機(jī) Server 上對相應(yīng)的 Key 進(jìn)行訪問,當(dāng)訪問超過 Server 極限時(shí),就會(huì)導(dǎo)致熱點(diǎn) Key 問題的產(chǎn)生。

如果熱點(diǎn)過于集中,熱點(diǎn) Key 的緩存過多,超過目前的緩存容量時(shí),就會(huì)導(dǎo)致緩存分片服務(wù)被打垮現(xiàn)象的產(chǎn)生。當(dāng)緩存服務(wù)崩潰后,此時(shí)再有請求產(chǎn)生,會(huì)緩存到后臺(tái) DB 上,由于DB 本身性能較弱,在面臨大請求時(shí)很容易發(fā)生請求穿透現(xiàn)象,會(huì)進(jìn)一步導(dǎo)致雪崩現(xiàn)象,嚴(yán)重影響設(shè)備的性能。

3.常用的熱key問題解決辦法:

解決方案一: 備份熱key
可以把熱點(diǎn)數(shù)據(jù)復(fù)制多份,在每一個(gè)數(shù)據(jù)副本的 key 中增加一個(gè)隨機(jī)后綴,讓它和其它副本數(shù)據(jù)不會(huì)被映射到同一個(gè) Slot 中。
這里相當(dāng)于把一份數(shù)據(jù)復(fù)制到其他實(shí)例上,這樣在訪問的時(shí)候也增加隨機(jī)前綴,將對一個(gè)實(shí)例的訪問壓力,均攤到其他實(shí)例上
例如:
我們在放入緩存時(shí)就將對應(yīng)業(yè)務(wù)的緩存key拆分成多個(gè)不同的key。如下圖所示,我們首先在更新緩存的一側(cè),將key拆成N份,比如一個(gè)key名字叫做”good_100”,那我們就可以把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增時(shí)都需要去改動(dòng)這N個(gè)key,這一步就是拆key。

530cb9be-3f2c-11ed-9e49-dac502259ad0.png

對于service端來講,我們就需要想辦法盡量將自己訪問的流量足夠的均勻。
如何給自己即將訪問的熱key上加入后綴?幾種辦法,根據(jù)本機(jī)的ip或mac地址做hash,之后的值與拆key的數(shù)量做取余,最終決定拼接成什么樣的key后綴,從而打到哪臺(tái)機(jī)器上;服務(wù)啟動(dòng)時(shí)的一個(gè)隨機(jī)數(shù)對拆key的數(shù)量做取余。
偽代碼如下:


const M = N * 2

//生成隨機(jī)數(shù)

random = GenRandom(0, M)

//構(gòu)造備份新key

bakHotKey = hotKey + “_” + random

data = redis.GET(bakHotKey)

if data == NULL {

data = GetFromDB()

redis.SET(bakHotKey, expireTime + GenRandom(0,5))

}

解決方案二: 本地緩存+動(dòng)態(tài)計(jì)算自動(dòng)發(fā)現(xiàn)熱點(diǎn)緩存 基本流程圖

534720e0-3f2c-11ed-9e49-dac502259ad0.png

該方案通過主動(dòng)發(fā)現(xiàn)熱點(diǎn)并對其進(jìn)行存儲(chǔ)來解決熱點(diǎn) Key 的問題。首先 Client 也會(huì)訪問 SLB,并且通過 SLB 將各種請求分發(fā)至 Proxy 中,Proxy 會(huì)按照基于路由的方式將請求轉(zhuǎn)發(fā)至后端的 Redis 中。 在熱點(diǎn) key 的解決上是采用在服務(wù)端增加緩存的方式進(jìn)行。具體來說就是在 Proxy 上增加本地緩存,本地緩存采用 LRU 算法來緩存熱點(diǎn)數(shù)據(jù),后端節(jié)點(diǎn)增加熱點(diǎn)數(shù)據(jù)計(jì)算模塊來返回?zé)狳c(diǎn)數(shù)據(jù)。

Proxy 架構(gòu)的主要有以下優(yōu)點(diǎn):

Proxy 本地緩存熱點(diǎn),讀能力可水平擴(kuò)展

DB 節(jié)點(diǎn)定時(shí)計(jì)算熱點(diǎn)數(shù)據(jù)集合

DB 反饋 Proxy 熱點(diǎn)數(shù)據(jù)

對客戶端完全透明,不需做任何兼容

熱點(diǎn)數(shù)據(jù)的發(fā)現(xiàn)與存儲(chǔ)

53771bec-3f2c-11ed-9e49-dac502259ad0.png

對于熱點(diǎn)數(shù)據(jù)的發(fā)現(xiàn),首先會(huì)在一個(gè)周期內(nèi)對 Key 進(jìn)行請求統(tǒng)計(jì),在達(dá)到請求量級(jí)后會(huì)對熱點(diǎn) Key 進(jìn)行熱點(diǎn)定位,并將所有的熱點(diǎn) Key 放入一個(gè)小的 LRU 鏈表內(nèi),在通過 Proxy 請求進(jìn)行訪問時(shí),若 Redis 發(fā)現(xiàn)待訪點(diǎn)是一個(gè)熱點(diǎn),就會(huì)進(jìn)入一個(gè)反饋階段,同時(shí)對該數(shù)據(jù)進(jìn)行標(biāo)記。 可以使用一個(gè)etcd或者zk集群來存儲(chǔ)反饋的熱點(diǎn)數(shù)據(jù),然后本地所有節(jié)點(diǎn)監(jiān)聽該熱點(diǎn)數(shù)據(jù),進(jìn)而加載到本地JVM緩存中。

熱點(diǎn)數(shù)據(jù)的獲取

54763c26-3f2c-11ed-9e49-dac502259ad0.png

在熱點(diǎn) Key 的處理上主要分為寫入跟讀取兩種形式,在數(shù)據(jù)寫入過程當(dāng) SLB 收到數(shù)據(jù) K1 并將其通過某一個(gè) Proxy 寫入一個(gè) Redis,完成數(shù)據(jù)的寫入。 假若經(jīng)過后端熱點(diǎn)模塊計(jì)算發(fā)現(xiàn) K1 成為熱點(diǎn) key 后, Proxy 會(huì)將該熱點(diǎn)進(jìn)行緩存,當(dāng)下次客戶端再進(jìn)行訪問 K1 時(shí),可以不經(jīng) Redis。 最后由于 proxy 是可以水平擴(kuò)充的,因此可以任意增強(qiáng)熱點(diǎn)數(shù)據(jù)的訪問能力。

最佳成熟方案: JD開源hotKey 這是目前較為成熟的自動(dòng)探測熱key、分布式一致性緩存解決方案。原理就是在client端做洞察,然后上報(bào)對應(yīng)hotkey,server端檢測到后,將對應(yīng)hotkey下發(fā)到對應(yīng)服務(wù)端做本地緩存,并且能保證本地緩存和遠(yuǎn)程緩存的一致性。

在這里咱們就不細(xì)談了,這篇文章的第三部分:JD開源hotkey源碼解析里面會(huì)帶領(lǐng)大家了解其整體原理。

3 JD開源hotkey—自動(dòng)探測熱key、分布式一致性緩存解決方案

3.1 解決痛點(diǎn)

從上面可知,熱點(diǎn)key問題在并發(fā)量比較高的系統(tǒng)中(特別是做秒殺活動(dòng))出現(xiàn)的頻率會(huì)比較高,對系統(tǒng)帶來的危害也很大。 那么針對此,hotkey誕生的目的是什么?需要解決的痛點(diǎn)是什么?以及它的實(shí)現(xiàn)原理。

在這里引用項(xiàng)目上的一段話來概述: 對任意突發(fā)性的無法預(yù)先感知的熱點(diǎn)數(shù)據(jù),包括并不限于熱點(diǎn)數(shù)據(jù)(如突發(fā)大量請求同一個(gè)商品)、熱用戶(如惡意爬蟲刷子)、熱接口(突發(fā)海量請求同一個(gè)接口)等,進(jìn)行毫秒級(jí)精準(zhǔn)探測到。然后對這些熱數(shù)據(jù)、熱用戶等,推送到所有服務(wù)端JVM內(nèi)存中,以大幅減輕對后端數(shù)據(jù)存儲(chǔ)層的沖擊,并可以由使用者決定如何分配、使用這些熱key(譬如對熱商品做本地緩存、對熱用戶進(jìn)行拒絕訪問、對熱接口進(jìn)行熔斷或返回默認(rèn)值)。這些熱數(shù)據(jù)在整個(gè)服務(wù)端集群內(nèi)保持一致性,并且業(yè)務(wù)隔離。

核心功能:熱數(shù)據(jù)探測并推送至集群各個(gè)服務(wù)器

3.2 集成方式

集成方式在這里就不詳述了,感興趣的同學(xué)可以自行搜索。

3.3 源碼解析

3.3.1 架構(gòu)簡介

1.全景圖一覽

549957d8-3f2c-11ed-9e49-dac502259ad0.png

流程介紹:

客戶端通過引用hotkey的client包,在啟動(dòng)的時(shí)候上報(bào)自己的信息給worker,同時(shí)和worker之間建立長連接。定時(shí)拉取配置中心上面的規(guī)則信息和worker集群信息。

客戶端調(diào)用hotkey的ishot()的方法來首先匹配規(guī)則,然后統(tǒng)計(jì)是不是熱key。

通過定時(shí)任務(wù)把熱key數(shù)據(jù)上傳到worker節(jié)點(diǎn)。

worker集群在收取到所有關(guān)于這個(gè)key的數(shù)據(jù)以后(因?yàn)橥ㄟ^hash來決定key 上傳到哪個(gè)worker的,所以同一個(gè)key只會(huì)在同一個(gè)worker節(jié)點(diǎn)上),在和定義的規(guī)則進(jìn)行匹配后判斷是不是熱key,如果是則推送給客戶端,完成本地緩存。

2.角色構(gòu)成

這里直接借用作者的描述: 1)etcd集群 etcd作為一個(gè)高性能的配置中心,可以以極小的資源占用,提供高效的監(jiān)聽訂閱服務(wù)。主要用于存放規(guī)則配置,各worker的ip地址,以及探測出的熱key、手工添加的熱key等。

2)client端jar包 就是在服務(wù)中添加的引用jar,引入后,就可以以便捷的方式去判斷某key是否熱key。同時(shí),該jar完成了key上報(bào)、監(jiān)聽etcd里的rule變化、worker信息變化、熱key變化,對熱key進(jìn)行本地caffeine緩存等。

3) worker端集群 worker端是一個(gè)獨(dú)立部署的Java程序,啟動(dòng)后會(huì)連接etcd,并定期上報(bào)自己的ip信息,供client端獲取地址并進(jìn)行長連接。之后,主要就是對各個(gè)client發(fā)來的待測key進(jìn)行累加計(jì)算,當(dāng)達(dá)到etcd里設(shè)定的rule閾值后,將熱key推送到各個(gè)client。

4) dashboard控制臺(tái) 控制臺(tái)是一個(gè)帶可視化界面的Java程序,也是連接到etcd,之后在控制臺(tái)設(shè)置各個(gè)APP的key規(guī)則,譬如2秒20次算熱。然后當(dāng)worker探測出來熱key后,會(huì)將key發(fā)往etcd,dashboard也會(huì)監(jiān)聽熱key信息,進(jìn)行入庫保存記錄。同時(shí),dashboard也可以手工添加、刪除熱key,供各個(gè)client端監(jiān)聽。

3.hotkey工程結(jié)構(gòu)

54ed3b50-3f2c-11ed-9e49-dac502259ad0.png

3.3.2 client端

主要從下面三個(gè)方面來解析源碼:

5522bc4e-3f2c-11ed-9e49-dac502259ad0.png

4.客戶端啟動(dòng)器

1)啟動(dòng)方式


@PostConstruct

public void init() {

ClientStarter.Builder builder = new ClientStarter.Builder();

ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build();

starter.startPipeline();

}

appName:是這個(gè)應(yīng)用的名稱,一般為${spring.application.name}的值,后續(xù)所有的配置都以此為開頭 etcd:是etcd集群的地址,用逗號(hào)分隔,配置中心。 還可以看到ClientStarter實(shí)現(xiàn)了建造者模式,使代碼更為簡介。

2)核心入口 com.jd.platform.hotkey.client.ClientStarter#startPipeline


/**

* 啟動(dòng)監(jiān)聽etcd

*/

public void startPipeline() {

JdLogger.info(getClass(), "etcdServer:" + etcdServer);

//設(shè)置caffeine的最大容量

Context.CAFFEINE_SIZE = caffeineSize;

//設(shè)置etcd地址

EtcdConfigFactory.buildConfigCenter(etcdServer);

//開始定時(shí)推送

PushSchedulerStarter.startPusher(pushPeriod);

PushSchedulerStarter.startCountPusher(10);

//開啟worker重連器

WorkerRetryConnector.retryConnectWorkers();

registEventBus();

EtcdStarter starter = new EtcdStarter();

//與etcd相關(guān)的監(jiān)聽都開啟

starter.start();

}

該方法主要有五個(gè)功能:

55590a92-3f2c-11ed-9e49-dac502259ad0.png

① 設(shè)置本地緩存(caffeine)的最大值,并創(chuàng)建etcd實(shí)例


//設(shè)置caffeine的最大容量

Context.CAFFEINE_SIZE = caffeineSize;

//設(shè)置etcd地址

EtcdConfigFactory.buildConfigCenter(etcdServer);

caffeineSize是本地緩存的最大值,在啟動(dòng)的時(shí)候可以設(shè)置,不設(shè)置默認(rèn)為200000。 etcdServer是上面說的etcd集群地址。

Context可以理解為一個(gè)配置類,里面就包含兩個(gè)字段:


public class Context {

public static String APP_NAME;

public static int CAFFEINE_SIZE;

}

EtcdConfigFactory是ectd配置中心的工廠類


public class EtcdConfigFactory {

private static IConfigCenter configCenter;

private EtcdConfigFactory() {}

public static IConfigCenter configCenter() {

return configCenter;

}

public static void buildConfigCenter(String etcdServer) {

//連接多個(gè)時(shí),逗號(hào)分隔

configCenter = JdEtcdBuilder.build(etcdServer);

}

}

通過其configCenter()方法獲取創(chuàng)建etcd實(shí)例對象,IConfigCenter接口封裝了etcd實(shí)例對象的行為(包括基本的crud、監(jiān)控、續(xù)約等)

55ab6f08-3f2c-11ed-9e49-dac502259ad0.png

② 創(chuàng)建并啟動(dòng)定時(shí)任務(wù):PushSchedulerStarter


//開始定時(shí)推送

PushSchedulerStarter.startPusher(pushPeriod);//每0.5秒推送一次待測key

PushSchedulerStarter.startCountPusher(10);//每10秒推送一次數(shù)量統(tǒng)計(jì),不可配置

pushPeriod是推送的間隔時(shí)間,可以再啟動(dòng)的時(shí)候設(shè)置,最小為0.05s,推送越快,探測的越密集,會(huì)越快探測出來,但對client資源消耗相應(yīng)增大

PushSchedulerStarter類


/**

* 每0.5秒推送一次待測key

*/

public static void startPusher(Long period) {

if (period == null || period <= 0) {

period = 500L;

}

@SuppressWarnings("PMD.ThreadPoolCreationRule")

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-pusher-service-executor", true));

scheduledExecutorService.scheduleAtFixedRate(() -> {

//熱key的收集器

IKeyCollector collectHK = KeyHandlerFactory.getCollector();

//這里相當(dāng)于每0.5秒,通過netty來給worker來推送收集到的熱key的信息,主要是一些熱key的元數(shù)據(jù)信息(熱key來源的app和key的類型和是否是刪除事件,還有該熱key的上報(bào)次數(shù))

//這里面還有就是該熱key在每次上報(bào)的時(shí)候都會(huì)生成一個(gè)全局的唯一id,還有該熱key每次上報(bào)的創(chuàng)建時(shí)間是在netty發(fā)送的時(shí)候來生成,同一批次的熱key時(shí)間是相同的

List hotKeyModels = collectHK.lockAndGetResult();

if(CollectionUtil.isNotEmpty(hotKeyModels)){

//積攢了半秒的key集合,按照hash分發(fā)到不同的worker

KeyHandlerFactory.getPusher().send(Context.APP_NAME, hotKeyModels);

collectHK.finishOnce();

}

},0, period, TimeUnit.MILLISECONDS);

}

/**

* 每10秒推送一次數(shù)量統(tǒng)計(jì)

*/

public static void startCountPusher(Integer period) {

if (period == null || period <= 0) {

period = 10;

}

@SuppressWarnings("PMD.ThreadPoolCreationRule")

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-count-pusher-service-executor", true));

scheduledExecutorService.scheduleAtFixedRate(() -> {

IKeyCollector collectHK = KeyHandlerFactory.getCounter();

List keyCountModels = collectHK.lockAndGetResult();

if(CollectionUtil.isNotEmpty(keyCountModels)){

//積攢了10秒的數(shù)量,按照hash分發(fā)到不同的worker

KeyHandlerFactory.getPusher().sendCount(Context.APP_NAME, keyCountModels);

collectHK.finishOnce();

}

},0, period, TimeUnit.SECONDS);

}

從上面兩個(gè)方法可知,都是通過定時(shí)線程池來實(shí)現(xiàn)定時(shí)任務(wù)的,都是守護(hù)線程。

咱們重點(diǎn)關(guān)注一下KeyHandlerFactory類,它是client端設(shè)計(jì)的一個(gè)比較巧妙的地方,從類名上直譯為key處理工廠。具體的實(shí)例對象是DefaultKeyHandler:


public class DefaultKeyHandler {

//推送HotKeyMsg消息到Netty的推送者

private IKeyPusher iKeyPusher = new NettyKeyPusher();

//待測key的收集器,這里面包含兩個(gè)map,key主要是熱key的名字,value主要是熱key的元數(shù)據(jù)信息(比如:熱key來源的app和key的類型和是否是刪除事件)

private IKeyCollector iKeyCollector = new TurnKeyCollector();

//數(shù)量收集器,這里面包含兩個(gè)map,這里面key是相應(yīng)的規(guī)則,HitCount里面是這個(gè)規(guī)則的總訪問次數(shù)和熱后訪問次數(shù)

private IKeyCollector iKeyCounter = new TurnCountCollector();

public IKeyPusher keyPusher() {

return iKeyPusher;

}

public IKeyCollector keyCollector() {

return iKeyCollector;

}

public IKeyCollector keyCounter() {

return iKeyCounter;

}

}

這里面有三個(gè)成員對象,分別是封裝推送消息到netty的NettyKeyPusher、待測key收集器TurnKeyCollector、數(shù)量收集器TurnCountCollector,其中后兩者都實(shí)現(xiàn)了接口IKeyCollector,能對hotkey的處理起到有效的聚合,充分體現(xiàn)了代碼的高內(nèi)聚。 先來看看封裝推送消息到netty的NettyKeyPusher:


/**

* 將msg推送到netty的pusher

* @author wuweifeng wrote on 2020-01-06

* @version 1.0

*/

public class NettyKeyPusher implements IKeyPusher {

@Override

public void send(String appName, List list) {

//積攢了半秒的key集合,按照hash分發(fā)到不同的worker

long now = System.currentTimeMillis();

Map> map = new HashMap<>();

for(HotKeyModel model : list) {

model.setCreateTime(now);

Channel channel = WorkerInfoHolder.chooseChannel(model.getKey());

if (channel == null) {

continue;

}

List newList = map.computeIfAbsent(channel, k -> new ArrayList<>());

newList.add(model);

}

for (Channel channel : map.keySet()) {

try {

List batch = map.get(channel);

HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_NEW_KEY, Context.APP_NAME);

hotKeyMsg.setHotKeyModels(batch);

channel.writeAndFlush(hotKeyMsg).sync();

} catch (Exception e) {

try {

InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();

JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());

} catch (Exception ex) {

JdLogger.error(getClass(),"flush error");

}

}

}

}

@Override

public void sendCount(String appName, List list) {

//積攢了10秒的數(shù)量,按照hash分發(fā)到不同的worker

long now = System.currentTimeMillis();

Map> map = new HashMap<>();

for(KeyCountModel model : list) {

model.setCreateTime(now);

Channel channel = WorkerInfoHolder.chooseChannel(model.getRuleKey());

if (channel == null) {

continue;

}

List newList = map.computeIfAbsent(channel, k -> new ArrayList<>());

newList.add(model);

}

for (Channel channel : map.keySet()) {

try {

List batch = map.get(channel);

HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_HIT_COUNT, Context.APP_NAME);

hotKeyMsg.setKeyCountModels(batch);

channel.writeAndFlush(hotKeyMsg).sync();

} catch (Exception e) {

try {

InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();

JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());

} catch (Exception ex) {

JdLogger.error(getClass(),"flush error");

}

}

}

}

}

send(String appName, Listlist) 主要是將TurnKeyCollector收集的待測key通過netty推送給worker,HotKeyModel對象主要是一些熱key的元數(shù)據(jù)信息(熱key來源的app和key的類型和是否是刪除事件,還有該熱key的上報(bào)次數(shù)) sendCount(String appName, Listlist) 主要是將TurnCountCollector收集的規(guī)則所對應(yīng)的key通過netty推送給worker,KeyCountModel對象主要是一些key所對應(yīng)的規(guī)則信息以及訪問次數(shù)等 WorkerInfoHolder.chooseChannel(model.getRuleKey()) 根據(jù)hash算法獲取key對應(yīng)的服務(wù)器,分發(fā)到對應(yīng)服務(wù)器相應(yīng)的Channel 連接,所以服務(wù)端可以水平無限擴(kuò)容,毫無壓力問題。

再來分析一下key收集器:TurnKeyCollector與TurnCountCollector: 實(shí)現(xiàn)IKeyCollector接口:


/**

* 對hotkey進(jìn)行聚合

* @author wuweifeng wrote on 2020-01-06

* @version 1.0

*/

public interface IKeyCollector {

/**

* 鎖定后的返回值

*/

List lockAndGetResult();

/**

* 輸入的參數(shù)

*/

void collect(T t);

void finishOnce();

}

lockAndGetResult() 主要是獲取返回collect方法收集的信息,并將本地暫存的信息清空,方便下個(gè)統(tǒng)計(jì)周期積攢數(shù)據(jù)。 collect(T t) 顧名思義他是收集api調(diào)用的時(shí)候,將收集的到key信息放到本地存儲(chǔ)。 finishOnce() 該方法目前實(shí)現(xiàn)都是空,無需關(guān)注。

待測key收集器:TurnKeyCollector


public class TurnKeyCollector implements IKeyCollector {

//這map里面的key主要是熱key的名字,value主要是熱key的元數(shù)據(jù)信息(比如:熱key來源的app和key的類型和是否是刪除事件)

private ConcurrentHashMap map0 = new ConcurrentHashMap<>();

private ConcurrentHashMap map1 = new ConcurrentHashMap<>();

private AtomicLong atomicLong = new AtomicLong(0);

@Override

public List lockAndGetResult() {

//自增后,對應(yīng)的map就會(huì)停止被寫入,等待被讀取

atomicLong.addAndGet(1);

List list;

//可以觀察這里與collect方法里面的相同位置,會(huì)發(fā)現(xiàn)一個(gè)是操作map0一個(gè)是操作map1,這樣保證在讀map的時(shí)候,不會(huì)阻塞寫map,

//兩個(gè)map同時(shí)提供輪流提供讀寫能力,設(shè)計(jì)的很巧妙,值得學(xué)習(xí)

if (atomicLong.get() % 2 == 0) {

list = get(map1);

map1.clear();

} else {

list = get(map0);

map0.clear();

}

return list;

}

private List get(ConcurrentHashMap map) {

return CollectionUtil.list(false, map.values());

}

@Override

public void collect(HotKeyModel hotKeyModel) {

String key = hotKeyModel.getKey();

if (StrUtil.isEmpty(key)) {

return;

}

if (atomicLong.get() % 2 == 0) {

//不存在時(shí)返回null并將key-value放入,已有相同key時(shí),返回該key對應(yīng)的value,并且不覆蓋

HotKeyModel model = map0.putIfAbsent(key, hotKeyModel);

if (model != null) {

//增加該hotMey上報(bào)的次數(shù)

model.add(hotKeyModel.getCount());

}

} else {

HotKeyModel model = map1.putIfAbsent(key, hotKeyModel);

if (model != null) {

model.add(hotKeyModel.getCount());

}

}

}

@Override

public void finishOnce() {}

}

可以看到該類中有兩個(gè)ConcurrentHashMap和一個(gè)AtomicLong,通過對AtomicLong來自增,然后對2取模,來分別控制兩個(gè)map的讀寫能力,保證每個(gè)map都能做讀寫,并且同一個(gè)map不能同時(shí)讀寫,這樣可以避免并發(fā)集合讀寫不阻塞,這一塊無鎖化的設(shè)計(jì)還是非常巧妙的,極大的提高了收集的吞吐量。 key數(shù)量收集器:TurnCountCollector 這里的設(shè)計(jì)與TurnKeyCollector大同小異,咱們就不細(xì)談了。值得一提的是它里面有個(gè)并行處理的機(jī)制,當(dāng)收集的數(shù)量超過DATA_CONVERT_SWITCH_THRESHOLD=5000的閾值時(shí),lockAndGetResult處理是使用java Stream并行流處理,提升處理的效率。

③ 開啟worker重連器


//開啟worker重連器

WorkerRetryConnector.retryConnectWorkers();

public class WorkerRetryConnector {

/**

* 定時(shí)去重連沒連上的workers

*/

public static void retryConnectWorkers() {

@SuppressWarnings("PMD.ThreadPoolCreationRule")

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("worker-retry-connector-service-executor", true));

//開啟拉取etcd的worker信息,如果拉取失敗,則定時(shí)繼續(xù)拉取

scheduledExecutorService.scheduleAtFixedRate(WorkerRetryConnector::reConnectWorkers, 30, 30, TimeUnit.SECONDS);

}

private static void reConnectWorkers() {

List nonList = WorkerInfoHolder.getNonConnectedWorkers();

if (nonList.size() == 0) {

return;

}

JdLogger.info(WorkerRetryConnector.class, "trying to reConnect to these workers :" + nonList);

NettyClient.getInstance().connect(nonList);//這里會(huì)觸發(fā)netty連接方法channelActive

}

}

也是通過定時(shí)線程來執(zhí)行,默認(rèn)時(shí)間間隔是30s,不可設(shè)置。 通過WorkerInfoHolder來控制client的worker連接信息,連接信息是個(gè)List,用的CopyOnWriteArrayList,畢竟是一個(gè)讀多寫少的場景,類似與元數(shù)據(jù)信息。


/**

* 保存worker的ip地址和Channel的映射關(guān)系,這是有序的。每次client發(fā)送消息時(shí),都會(huì)根據(jù)該map的size進(jìn)行hash

* 如key-1就發(fā)送到workerHolder的第1個(gè)Channel去,key-2就發(fā)到第2個(gè)Channel去

*/

private static final List WORKER_HOLDER = new CopyOnWriteArrayList<>();

④ 注冊EventBus事件訂閱者


private void registEventBus() {

//netty連接器會(huì)關(guān)注WorkerInfoChangeEvent事件

EventBusCenter.register(new WorkerChangeSubscriber());

//熱key探測回調(diào)關(guān)注熱key事件

EventBusCenter.register(new ReceiveNewKeySubscribe());

//Rule的變化的事件

EventBusCenter.register(new KeyRuleHolder());

}

使用guava的EventBus事件消息總線,利用發(fā)布/訂閱者模式來對項(xiàng)目進(jìn)行解耦。它可以利用很少的代碼,來實(shí)現(xiàn)多組件間通信。 基本原理圖如下: 560606c0-3f2c-11ed-9e49-dac502259ad0.png

監(jiān)聽worker信息變動(dòng):WorkerChangeSubscriber


/**

* 監(jiān)聽worker信息變動(dòng)

*/

@Subscribe

public void connectAll(WorkerInfoChangeEvent event) {

List addresses = event.getAddresses();

if (addresses == null) {

addresses = new ArrayList<>();

}

WorkerInfoHolder.mergeAndConnectNew(addresses);

}

/**

* 當(dāng)client與worker的連接斷開后,刪除

*/

@Subscribe

public void channelInactive(ChannelInactiveEvent inactiveEvent) {

//獲取斷線的channel

Channel channel = inactiveEvent.getChannel();

InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();

String address = socketAddress.getHostName() + ":" + socketAddress.getPort();

JdLogger.warn(getClass(), "this channel is inactive : " + socketAddress + " trying to remove this connection");

WorkerInfoHolder.dealChannelInactive(address);

}

562af9b2-3f2c-11ed-9e49-dac502259ad0.png

監(jiān)聽熱key回調(diào)事件:ReceiveNewKeySubscribe


private ReceiveNewKeyListener receiveNewKeyListener = new DefaultNewKeyListener();

@Subscribe

public void newKeyComing(ReceiveNewKeyEvent event) {

HotKeyModel hotKeyModel = event.getModel();

if (hotKeyModel == null) {

return;

}

//收到新key推送

if (receiveNewKeyListener != null) {

receiveNewKeyListener.newKey(hotKeyModel);

}

}

該方法會(huì)收到新的熱key訂閱事件之后,會(huì)將其加入到KeyHandlerFactory的收集器里面處理。

核心處理邏輯


@Override

public void newKey(HotKeyModel hotKeyModel) {

long now = System.currentTimeMillis();

//如果key到達(dá)時(shí)已經(jīng)過去1秒了,記錄一下。手工刪除key時(shí),沒有CreateTime

if (hotKeyModel.getCreateTime() != 0 && Math.abs(now - hotKeyModel.getCreateTime()) > 1000) {

JdLogger.warn(getClass(), "the key comes too late : " + hotKeyModel.getKey() + " now " +

+now + " keyCreateAt " + hotKeyModel.getCreateTime());

}

if (hotKeyModel.isRemove()) {

//如果是刪除事件,就直接刪除

deleteKey(hotKeyModel.getKey());

return;

}

//已經(jīng)是熱key了,又推過來同樣的熱key,做個(gè)日志記錄,并刷新一下

if (JdHotKeyStore.isHot(hotKeyModel.getKey())) {

JdLogger.warn(getClass(), "receive repeat hot key :" + hotKeyModel.getKey() + " at " + now);

}

addKey(hotKeyModel.getKey());

}

private void deleteKey(String key) {

CacheFactory.getNonNullCache(key).delete(key);

}

private void addKey(String key) {

ValueModel valueModel = ValueModel.defaultValue(key);

if (valueModel == null) {

//不符合任何規(guī)則

deleteKey(key);

return;

}

//如果原來該key已經(jīng)存在了,那么value就被重置,過期時(shí)間也會(huì)被重置。如果原來不存在,就新增的熱key

JdHotKeyStore.setValueDirectly(key, valueModel);

}

如果該HotKeyModel里面是刪除事件,則獲取RULE_CACHE_MAP里面該key超時(shí)時(shí)間對應(yīng)的caffeine,然后從中刪除該key緩存,然后返回(這里相當(dāng)于刪除了本地緩存)。

如果不是刪除事件,則在RULE_CACHE_MAP對應(yīng)的caffeine緩存中添加該key的緩存。

這里有個(gè)注意點(diǎn),如果不為刪除事件,調(diào)用addKey()方法在caffeine增加緩存的時(shí)候,value是一個(gè)魔術(shù)值0x12fcf76,這個(gè)值只代表加了這個(gè)緩存,但是這個(gè)緩存在查詢的時(shí)候相當(dāng)于為null。

監(jiān)聽Rule的變化事件:KeyRuleHolder

56584688-3f2c-11ed-9e49-dac502259ad0.png

可以看到里面有兩個(gè)成員屬性:RULE_CACHE_MAP,KEY_RULES


/**

* 保存超時(shí)時(shí)間和caffeine的映射,key是超時(shí)時(shí)間,value是caffeine[(String,Object)]

*/

private static final ConcurrentHashMap RULE_CACHE_MAP = new ConcurrentHashMap<>();

/**

* 這里KEY_RULES是保存etcd里面該appName所對應(yīng)的所有rule

*/

private static final List KEY_RULES = new ArrayList<>();

ConcurrentHashMapRULE_CACHE_MAP:

保存超時(shí)時(shí)間和caffeine的映射,key是超時(shí)時(shí)間,value是caffeine[(String,Object)]。

巧妙的設(shè)計(jì):這里將key的過期時(shí)間作為分桶策略,這樣同一個(gè)過期時(shí)間的key就會(huì)在一個(gè)桶(caffeine)里面,這里面每一個(gè)caffeine都是client的本地緩存,也就是說hotKey的本地緩存的KV實(shí)際上是存儲(chǔ)在這里面的。

ListKEY_RULES:

這里KEY_RULES是保存etcd里面該appName所對應(yīng)的所有rule。

具體監(jiān)聽KeyRuleInfoChangeEvent事件方法:


@Subscribe

public void ruleChange(KeyRuleInfoChangeEvent event) {

JdLogger.info(getClass(), "new rules info is :" + event.getKeyRules());

List ruleList = event.getKeyRules();

if (ruleList == null) {

return;

}

putRules(ruleList);

}

核心處理邏輯


/**

* 所有的規(guī)則,如果規(guī)則的超時(shí)時(shí)間變化了,會(huì)重建caffeine

*/

public static void putRules(List keyRules) {

synchronized (KEY_RULES) {

//如果規(guī)則為空,清空規(guī)則表

if (CollectionUtil.isEmpty(keyRules)) {

KEY_RULES.clear();

RULE_CACHE_MAP.clear();

return;

}

KEY_RULES.clear();

KEY_RULES.addAll(keyRules);

Set durationSet = keyRules.stream().map(KeyRule::getDuration).collect(Collectors.toSet());

for (Integer duration : RULE_CACHE_MAP.keySet()) {

//先清除掉那些在RULE_CACHE_MAP里存的,但是rule里已沒有的

if (!durationSet.contains(duration)) {

RULE_CACHE_MAP.remove(duration);

}

}

//遍歷所有的規(guī)則

for (KeyRule keyRule : keyRules) {

int duration = keyRule.getDuration();

//這里如果RULE_CACHE_MAP里面沒有超時(shí)時(shí)間為duration的value,則新建一個(gè)放入到RULE_CACHE_MAP里面

//比如RULE_CACHE_MAP本來就是空的,則在這里來構(gòu)建RULE_CACHE_MAP的映射關(guān)系

//TODO 如果keyRules里面包含相同duration的keyRule,則也只會(huì)建一個(gè)key為duration,value為caffeine,其中caffeine是(string,object)

if (RULE_CACHE_MAP.get(duration) == null) {

LocalCache cache = CacheFactory.build(duration);

RULE_CACHE_MAP.put(duration, cache);

}

}

}

}

使用synchronized關(guān)鍵字來保證線程安全;

如果規(guī)則為空,清空規(guī)則表(RULE_CACHE_MAP、KEY_RULES);

使用傳遞進(jìn)來的keyRules來覆蓋KEY_RULES;

清除掉RULE_CACHE_MAP里面在keyRules沒有的映射關(guān)系;

遍歷所有的keyRules,如果RULE_CACHE_MAP里面沒有相關(guān)的超時(shí)時(shí)間key,則在里面賦值;

⑤ 啟動(dòng)EtcdStarter(etcd連接管理器)


EtcdStarter starter = new EtcdStarter();

//與etcd相關(guān)的監(jiān)聽都開啟

starter.start();

public void start() {

fetchWorkerInfo();

fetchRule();

startWatchRule();

//監(jiān)聽熱key事件,只監(jiān)聽手工添加、刪除的key

startWatchHotKey();

}

fetchWorkerInfo() 從etcd里面拉取worker集群地址信息allAddress,并更新WorkerInfoHolder里面的WORKER_HOLDER


/**

* 每隔30秒拉取worker信息

*/

private void fetchWorkerInfo() {

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

//開啟拉取etcd的worker信息,如果拉取失敗,則定時(shí)繼續(xù)拉取

scheduledExecutorService.scheduleAtFixedRate(() -> {

JdLogger.info(getClass(), "trying to connect to etcd and fetch worker info");

fetch();

}, 0, 30, TimeUnit.SECONDS);

}

使用定時(shí)線程池來執(zhí)行,單線程。

定時(shí)從etcd里面獲取,地址/jd/workers/+$appName或default,時(shí)間間隔不可設(shè)置,默認(rèn)30秒,這里面存儲(chǔ)的是worker地址的ip+port。

發(fā)布WorkerInfoChangeEvent事件。

備注:地址有$appName或default,在worker里面配置,如果把worker放到某個(gè)appName下,則該worker只會(huì)參與該app的計(jì)算。

fetchRule() 定時(shí)線程來執(zhí)行,單線程,時(shí)間間隔不可設(shè)置,默認(rèn)是5秒,當(dāng)拉取規(guī)則配置和手動(dòng)配置的hotKey成功后,該線程被終止(也就是說只會(huì)成功執(zhí)行一次),執(zhí)行失敗繼續(xù)執(zhí)行


private void fetchRule() {

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

//開啟拉取etcd的worker信息,如果拉取失敗,則定時(shí)繼續(xù)拉取

scheduledExecutorService.scheduleAtFixedRate(() -> {

JdLogger.info(getClass(), "trying to connect to etcd and fetch rule info");

boolean success = fetchRuleFromEtcd();

if (success) {

//拉取已存在的熱key

fetchExistHotKey();

//這里如果拉取規(guī)則和拉取手動(dòng)配置的hotKey成功之后,則該定時(shí)執(zhí)行線程停止

scheduledExecutorService.shutdown();

}

}, 0, 5, TimeUnit.SECONDS);

}

fetchRuleFromEtcd()

從etcd里面獲取該appName配置的rule規(guī)則,地址/jd/rules/+$appName。

如果查出來規(guī)則rules為空,會(huì)通過發(fā)布KeyRuleInfoChangeEvent事件來清空本地的rule配置緩存和所有的規(guī)則key緩存。

發(fā)布KeyRuleInfoChangeEvent事件。

fetchExistHotKey()

從etcd里面獲取該appName手動(dòng)配置的熱key,地址/jd/hotkeys/+$appName。

發(fā)布ReceiveNewKeyEvent事件,并且內(nèi)容HotKeyModel不是刪除事件。

startWatchRule()


/**

* 異步監(jiān)聽rule規(guī)則變化

*/

private void startWatchRule() {

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.submit(() -> {

JdLogger.info(getClass(), "--- begin watch rule change ----");

try {

IConfigCenter configCenter = EtcdConfigFactory.configCenter();

KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.rulePath + Context.APP_NAME);

//如果有新事件,即rule的變更,就重新拉取所有的信息

while (watchIterator.hasNext()) {

//這句必須寫,next會(huì)讓他卡住,除非真的有新rule變更

WatchUpdate watchUpdate = watchIterator.next();

List eventList = watchUpdate.getEvents();

JdLogger.info(getClass(), "rules info changed. begin to fetch new infos. rule change is " + eventList);

//全量拉取rule信息

fetchRuleFromEtcd();

}

} catch (Exception e) {

JdLogger.error(getClass(), "watch err");

}

});

}

異步監(jiān)聽rule規(guī)則變化,使用etcd監(jiān)聽地址為/jd/rules/+$appName的節(jié)點(diǎn)變化。

使用線程池,單線程,異步監(jiān)聽rule規(guī)則變化,如果有事件變化,則調(diào)用fetchRuleFromEtcd()方法。

startWatchHotKey() 異步開始監(jiān)聽熱key變化信息,使用etcd監(jiān)聽地址前綴為/jd/hotkeys/+$appName


/**

* 異步開始監(jiān)聽熱key變化信息,該目錄里只有手工添加的key信息

*/

private void startWatchHotKey() {

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.submit(() -> {

JdLogger.info(getClass(), "--- begin watch hotKey change ----");

IConfigCenter configCenter = EtcdConfigFactory.configCenter();

try {

KvClient.WatchIterator watchIterator = configCenter.watchPrefix(ConfigConstant.hotKeyPath + Context.APP_NAME);

//如果有新事件,即新key產(chǎn)生或刪除

while (watchIterator.hasNext()) {

WatchUpdate watchUpdate = watchIterator.next();

List eventList = watchUpdate.getEvents();

KeyValue keyValue = eventList.get(0).getKv();

Event.EventType eventType = eventList.get(0).getType();

try {

//從這個(gè)地方可以看出,etcd給的返回是節(jié)點(diǎn)的全路徑,而我們需要的key要去掉前綴

String key = keyValue.getKey().toStringUtf8().replace(ConfigConstant.hotKeyPath + Context.APP_NAME + "/", "");

//如果是刪除key,就立刻刪除

if (Event.EventType.DELETE == eventType) {

HotKeyModel model = new HotKeyModel();

model.setRemove(true);

model.setKey(key);

EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

} else {

HotKeyModel model = new HotKeyModel();

model.setRemove(false);

String value = keyValue.getValue().toStringUtf8();

//新增熱key

JdLogger.info(getClass(), "etcd receive new key : " + key + " --value:" + value);

//如果這是一個(gè)刪除指令,就什么也不干

//TODO 這里有個(gè)疑問,監(jiān)聽到worker自動(dòng)探測發(fā)出的惰性刪除指令,這里之間跳過了,但是本地緩存沒有更新吧?

//TODO 所以我猜測在客戶端使用判斷緩存是否存在的api里面,應(yīng)該會(huì)判斷相關(guān)緩存的value值是否為"#[DELETE]#"刪除標(biāo)記

//解疑:這里確實(shí)只監(jiān)聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動(dòng)配置hotKey,worker自動(dòng)探測的hotKey是直接通過netty通道來告知client的

if (Constant.DEFAULT_DELETE_VALUE.equals(value)) {

continue;

}

//手工創(chuàng)建的value是時(shí)間戳

model.setCreateTime(Long.valueOf(keyValue.getValue().toStringUtf8()));

model.setKey(key);

EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

}

} catch (Exception e) {

JdLogger.error(getClass(), "new key err :" + keyValue);

}

}

} catch (Exception e) {

JdLogger.error(getClass(), "watch err");

}

});

}

使用線程池,單線程,異步監(jiān)聽熱key變化

使用etcd監(jiān)聽前綴地址的當(dāng)前節(jié)點(diǎn)以及子節(jié)點(diǎn)的所有變化值

刪除節(jié)點(diǎn)動(dòng)作

發(fā)布ReceiveNewKeyEvent事件,并且內(nèi)容HotKeyModel是刪除事件

新增or更新節(jié)點(diǎn)動(dòng)作

事件變化的value值為刪除標(biāo)記#[DELETE]#

如果是刪除標(biāo)記的話,代表是worker自動(dòng)探測或者client需要?jiǎng)h除的指令。

如果是刪除標(biāo)記則什么也不做,直接跳過(這里從HotKeyPusher#push方法可以看到,做刪除事件的操作時(shí)候,他會(huì)給/jd/hotkeys/+$appName的節(jié)點(diǎn)里面增加一個(gè)值為刪除標(biāo)記的節(jié)點(diǎn),然后再刪除相同路徑的節(jié)點(diǎn),這樣就可以觸發(fā)上面的刪除節(jié)點(diǎn)事件,所以這里判斷如果是刪除標(biāo)記直接跳過)。

不為刪除標(biāo)記

發(fā)布ReceiveNewKeyEvent事件,事件內(nèi)容HotKeyModel里面的createTime是kv對應(yīng)的時(shí)間戳

疑問: 這里代碼注釋里面說只監(jiān)聽手工添加或者刪除的hotKey,難道說/jd/hotkeys/+$appName地址只是手工配置的地址嗎? 解疑: 這里確實(shí)只監(jiān)聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動(dòng)配置hotKey,worker自動(dòng)探測的hotKey是直接通過netty通道來告知client的

5.API解析

1)流程圖示 ① 查詢流程

5676cdb0-3f2c-11ed-9e49-dac502259ad0.png

② 刪除流程:

56a3ae0c-3f2c-11ed-9e49-dac502259ad0.png

從上面的流程圖中,大家應(yīng)該知道該熱點(diǎn)key在代碼中是如何扭轉(zhuǎn)的,這里再給大家講解一下核心API的源碼解析,限于篇幅的原因,咱們不一個(gè)個(gè)貼相關(guān)源碼了,只是單純的告訴你它的內(nèi)部邏輯是怎么樣的。 2)核心類:JdHotKeyStore

56c91976-3f2c-11ed-9e49-dac502259ad0.png

JdHotKeyStore是封裝client調(diào)用的api核心類,包含上面10個(gè)公共方法,咱們重點(diǎn)解析其中6個(gè)重要方法: ① isHotKey(String key) 判斷是否在規(guī)則內(nèi),如果不在返回false 判斷是否是熱key,如果不是或者是且過期時(shí)間在2s內(nèi),則給TurnKeyCollector#collect收集 最后給TurnCountCollector#collect做統(tǒng)計(jì)收集 ② get(String key) 從本地caffeine取值 如果取到的value是個(gè)魔術(shù)值,只代表加入到caffeine緩存里面了,查詢的話為null ③ smartSet(String key, Object value) 判斷是否是熱key,這里不管它在不在規(guī)則內(nèi),如果是熱key,則給value賦值,如果不為熱key什么也不做 ④ forceSet(String key, Object value) 強(qiáng)制給value賦值 如果該key不在規(guī)則配置內(nèi),則傳遞的value不生效,本地緩存的賦值value會(huì)被變?yōu)閚ull ⑤ getValue(String key, KeyType keyType) 獲取value,如果value不存在則調(diào)用HotKeyPusher#push方法發(fā)往netty 如果沒有為該key配置規(guī)則,就不用上報(bào)key,直接返回null 如果取到的value是個(gè)魔術(shù)值,只代表加入到caffeine緩存里面了,查詢的話為null ⑥ remove(String key) 刪除某key(本地的caffeine緩存),會(huì)通知整個(gè)集群刪除(通過etcd來通知集群刪除) 3)client上傳熱key入口調(diào)用類:HotKeyPusher 核心方法:


public static void push(String key, KeyType keyType, int count, boolean remove) {

if (count <= 0) {

count = 1;

}

if (keyType == null) {

keyType = KeyType.REDIS_KEY;

}

if (key == null) {

return;

}

//這里之所以用LongAdder是為了保證多線程計(jì)數(shù)的線程安全性,雖然這里是在方法內(nèi)調(diào)用的,但是在TurnKeyCollector的兩個(gè)map里面,

//存儲(chǔ)了HotKeyModel的實(shí)例對象,這樣在多個(gè)線程同時(shí)修改count的計(jì)數(shù)屬性時(shí),會(huì)存在線程安全計(jì)數(shù)不準(zhǔn)確問題

LongAdder adderCnt = new LongAdder();

adderCnt.add(count);

HotKeyModel hotKeyModel = new HotKeyModel();

hotKeyModel.setAppName(Context.APP_NAME);

hotKeyModel.setKeyType(keyType);

hotKeyModel.setCount(adderCnt);

hotKeyModel.setRemove(remove);

hotKeyModel.setKey(key);

if (remove) {

//如果是刪除key,就直接發(fā)到etcd去,不用做聚合。但是有點(diǎn)問題現(xiàn)在,這個(gè)刪除只能刪手工添加的key,不能刪worker探測出來的

//因?yàn)楦鱾€(gè)client都在監(jiān)聽手工添加的那個(gè)path,沒監(jiān)聽自動(dòng)探測的path。所以如果手工的那個(gè)path下,沒有該key,那么是刪除不了的。

//刪不了,就達(dá)不到集群監(jiān)聽刪除事件的效果,怎么辦呢?可以通過新增的方式,新增一個(gè)熱key,然后刪除它

//TODO 這里為啥不直接刪除該節(jié)點(diǎn),難道worker自動(dòng)探測處理的hotKey不會(huì)往該節(jié)點(diǎn)增加新增事件嗎?

//釋疑:worker根據(jù)探測配置的規(guī)則,當(dāng)判斷出某個(gè)key為hotKey后,確實(shí)不會(huì)往keyPath里面加入節(jié)點(diǎn),他只是單純的往本地緩存里面加入一個(gè)空值,代表是熱點(diǎn)key

EtcdConfigFactory.configCenter().putAndGrant(HotKeyPathTool.keyPath(hotKeyModel), Constant.DEFAULT_DELETE_VALUE, 1);

EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyPath(hotKeyModel));//TODO 這里很巧妙待補(bǔ)充描述

//也刪worker探測的目錄

EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyRecordPath(hotKeyModel));

} else {

//如果key是規(guī)則內(nèi)的要被探測的key,就積累等待傳送

if (KeyRuleHolder.isKeyInRule(key)) {

//積攢起來,等待每半秒發(fā)送一次

KeyHandlerFactory.getCollector().collect(hotKeyModel);

}

}

}

從上面的源碼中可知:

這里之所以用LongAdder是為了保證多線程計(jì)數(shù)的線程安全性,雖然這里是在方法內(nèi)調(diào)用的,但是在TurnKeyCollector的兩個(gè)map里面,存儲(chǔ)了HotKeyModel的實(shí)例對象,這樣在多個(gè)線程同時(shí)修改count的計(jì)數(shù)屬性時(shí),會(huì)存在線程安全計(jì)數(shù)不準(zhǔn)確問題。

如果是remove刪除類型,在刪除手動(dòng)配置的熱key配置路徑的同時(shí),還會(huì)刪除dashboard展示熱key的配置路徑。

只有在規(guī)則配置的key,才會(huì)被積攢探測發(fā)送到worker內(nèi)進(jìn)行計(jì)算。

6.通訊機(jī)制(與worker交互)

1)NettyClient:netty連接器


public class NettyClient {

private static final NettyClient nettyClient = new NettyClient();

private Bootstrap bootstrap;

public static NettyClient getInstance() {

return nettyClient;

}

private NettyClient() {

if (bootstrap == null) {

bootstrap = initBootstrap();

}

}

private Bootstrap initBootstrap() {

//少線程

EventLoopGroup group = new NioEventLoopGroup(2);

Bootstrap bootstrap = new Bootstrap();

NettyClientHandler nettyClientHandler = new NettyClientHandler();

bootstrap.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.SO_KEEPALIVE, true)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) {

ByteBuf delimiter = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());

ch.pipeline()

.addLast(new DelimiterBasedFrameDecoder(Constant.MAX_LENGTH, delimiter))//這里就是定義TCP多個(gè)包之間的分隔符,為了更好的做拆包

.addLast(new MsgDecoder())

.addLast(new MsgEncoder())

//30秒沒消息時(shí),就發(fā)心跳包過去

.addLast(new IdleStateHandler(0, 0, 30))

.addLast(nettyClientHandler);

}

});

return bootstrap;

}

}

使用Reactor線程模型,只有2個(gè)工作線程,沒有單獨(dú)設(shè)置主線程

長連接,開啟TCP_NODELAY

netty的分隔符”$()$”,類似TCP報(bào)文分段的標(biāo)準(zhǔn),方便拆包

Protobuf序列化與反序列化

30s沒有消息發(fā)給對端的時(shí)候,發(fā)送一個(gè)心跳包判活

工作線程處理器NettyClientHandler

JDhotkey的tcp協(xié)議設(shè)計(jì)就是收發(fā)字符串,每個(gè)tcp消息包使用特殊字符$()$來分割 優(yōu)點(diǎn):這樣實(shí)現(xiàn)非常簡單。 獲得消息包后進(jìn)行json或者protobuf反序列化。 缺點(diǎn):是需要,從字節(jié)流-》反序列化成字符串-》反序列化成消息對象,兩層序列化損耗了一部分性能。 protobuf還好序列化很快,但是json序列化的速度只有幾十萬每秒,會(huì)損耗一部分性能。 2)NettyClientHandler:工作線程處理器


@ChannelHandler.Sharable

public class NettyClientHandler extends SimpleChannelInboundHandler {

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {

IdleStateEvent idleStateEvent = (IdleStateEvent) evt;

//這里表示如果讀寫都掛了

if (idleStateEvent.state() == IdleState.ALL_IDLE) {

//向服務(wù)端發(fā)送消息

ctx.writeAndFlush(new HotKeyMsg(MessageType.PING, Context.APP_NAME));

}

}

super.userEventTriggered(ctx, evt);

}

//在Channel注冊EventLoop、綁定SocketAddress和連接ChannelFuture的時(shí)候都有可能會(huì)觸發(fā)ChannelInboundHandler的channelActive方法的調(diào)用

//類似TCP三次握手成功之后觸發(fā)

@Override

public void channelActive(ChannelHandlerContext ctx) {

JdLogger.info(getClass(), "channelActive:" + ctx.name());

ctx.writeAndFlush(new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME));

}

//類似TCP四次揮手之后,等待2MSL時(shí)間之后觸發(fā)(大概180s),比如channel通道關(guān)閉會(huì)觸發(fā)(channel.close())

//客戶端channel主動(dòng)關(guān)閉連接時(shí),會(huì)向服務(wù)端發(fā)送一個(gè)寫請求,然后服務(wù)端channel所在的selector會(huì)監(jiān)聽到一個(gè)OP_READ事件,然后

//執(zhí)行數(shù)據(jù)讀取操作,而讀取時(shí)發(fā)現(xiàn)客戶端channel已經(jīng)關(guān)閉了,則讀取數(shù)據(jù)字節(jié)個(gè)數(shù)返回-1,然后執(zhí)行close操作,關(guān)閉該channel對應(yīng)的底層socket,

//并在pipeline中,從head開始,往下將InboundHandler,并觸發(fā)handler的channelInactive和channelUnregistered方法的執(zhí)行,以及移除pipeline中的handlers一系列操作。

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

super.channelInactive(ctx);

//斷線了,可能只是client和server斷了,但都和etcd沒斷。也可能是client自己斷網(wǎng)了,也可能是server斷了

//發(fā)布斷線事件。后續(xù)10秒后進(jìn)行重連,根據(jù)etcd里的worker信息來決定是否重連,如果etcd里沒了,就不重連。如果etcd里有,就重連

notifyWorkerChange(ctx.channel());

}

private void notifyWorkerChange(Channel channel) {

EventBusCenter.getInstance().post(new ChannelInactiveEvent(channel));

}

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, HotKeyMsg msg) {

if (MessageType.PONG == msg.getMessageType()) {

JdLogger.info(getClass(), "heart beat");

return;

}

if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {

JdLogger.info(getClass(), "receive new key : " + msg);

if (CollectionUtil.isEmpty(msg.getHotKeyModels())) {

return;

}

for (HotKeyModel model : msg.getHotKeyModels()) {

EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

}

}

}

}

userEventTriggered

收到對端發(fā)來的心跳包,返回new HotKeyMsg(MessageType.PING, Context.APP_NAME)

channelActive

在Channel注冊EventLoop、綁定SocketAddress和連接ChannelFuture的時(shí)候都有可能會(huì)觸發(fā)ChannelInboundHandler的channelActive方法的調(diào)用

類似TCP三次握手成功之后觸發(fā),給對端發(fā)送new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME)

channelInactive

類似TCP四次揮手之后,等待2MSL時(shí)間之后觸發(fā)(大概180s),比如channel通道關(guān)閉會(huì)觸發(fā)(channel.close())該方法,發(fā)布ChannelInactiveEvent事件,來10s后重連

channelRead0

接收PONG消息類型時(shí),打個(gè)日志返回

接收RESPONSE_NEW_KEY消息類型時(shí),發(fā)布ReceiveNewKeyEvent事件

3.3.3 worker端

1.入口啟動(dòng)加載:7個(gè)@PostConstruct

1)worker端對etcd相關(guān)的處理:EtcdStarter ① 第一個(gè)@PostConstruct:watchLog()


@PostConstruct

public void watchLog() {

AsyncPool.asyncDo(() -> {

try {

//取etcd的是否開啟日志配置,地址/jd/logOn

String loggerOn = configCenter.get(ConfigConstant.logToggle);

LOGGER_ON = "true".equals(loggerOn) || "1".equals(loggerOn);

} catch (StatusRuntimeException ex) {

logger.error(ETCD_DOWN);

}

//監(jiān)聽etcd地址/jd/logOn是否開啟日志配置,并實(shí)時(shí)更改開關(guān)

KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.logToggle);

while (watchIterator.hasNext()) {

WatchUpdate watchUpdate = watchIterator.next();

List eventList = watchUpdate.getEvents();

KeyValue keyValue = eventList.get(0).getKv();

logger.info("log toggle changed : " + keyValue);

String value = keyValue.getValue().toStringUtf8();

LOGGER_ON = "true".equals(value) || "1".equals(value);

}

});

}

放到線程池里面異步執(zhí)行

取etcd的是否開啟日志配置,地址/jd/logOn,默認(rèn)true

監(jiān)聽etcd地址/jd/logOn是否開啟日志配置,并實(shí)時(shí)更改開關(guān)

由于有etcd的監(jiān)聽,所以會(huì)一直執(zhí)行,而不是執(zhí)行一次結(jié)束

② 第二個(gè)@PostConstruct:watch()


/**

* 啟動(dòng)回調(diào)監(jiān)聽器,監(jiān)聽rule變化

*/

@PostConstruct

public void watch() {

AsyncPool.asyncDo(() -> {

KvClient.WatchIterator watchIterator;

if (isForSingle()) {

watchIterator = configCenter.watch(ConfigConstant.rulePath + workerPath);

} else {

watchIterator = configCenter.watchPrefix(ConfigConstant.rulePath);

}

while (watchIterator.hasNext()) {

WatchUpdate watchUpdate = watchIterator.next();

List eventList = watchUpdate.getEvents();

KeyValue keyValue = eventList.get(0).getKv();

logger.info("rule changed : " + keyValue);

try {

ruleChange(keyValue);

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

/**

* rule發(fā)生變化時(shí),更新緩存的rule

*/

private synchronized void ruleChange(KeyValue keyValue) {

String appName = keyValue.getKey().toStringUtf8().replace(ConfigConstant.rulePath, "");

if (StrUtil.isEmpty(appName)) {

return;

}

String ruleJson = keyValue.getValue().toStringUtf8();

List keyRules = FastJsonUtils.toList(ruleJson, KeyRule.class);

KeyRuleHolder.put(appName, keyRules);

}

通過etcd.workerPath配置,來判斷該worker是否為某個(gè)app單獨(dú)服務(wù)的,默認(rèn)為”default”,如果是默認(rèn)值,代表該worker參與在etcd上所有app client的計(jì)算,否則只為某個(gè)app來服務(wù)計(jì)算 使用etcd來監(jiān)聽rule規(guī)則變化,如果是共享的worker,監(jiān)聽地址前綴為”/jd/rules/“,如果為某個(gè)app獨(dú)享,監(jiān)聽地址為”/jd/rules/“+$etcd.workerPath 如果規(guī)則變化,則修改對應(yīng)app在本地存儲(chǔ)的rule緩存,同時(shí)清理該app在本地存儲(chǔ)的KV緩存

KeyRuleHolder:rule緩存本地存儲(chǔ)

Map,>

相對于client的KeyRuleHolder的區(qū)別:worker是存儲(chǔ)所有app規(guī)則,每個(gè)app對應(yīng)一個(gè)規(guī)則桶,所以用map

CaffeineCacheHolder:key緩存本地存儲(chǔ)

Map,>

相對于client的caffeine,第一是worker沒有做緩存接口比如LocalCache,第二是client的map的kv分別是超時(shí)時(shí)間、以及相同超時(shí)時(shí)間所對應(yīng)key的緩存桶

放到線程池里面異步執(zhí)行,由于有etcd的監(jiān)聽,所以會(huì)一直執(zhí)行,而不是執(zhí)行一次結(jié)束

③ 第三個(gè)@PostConstruct:watchWhiteList()


/**

* 啟動(dòng)回調(diào)監(jiān)聽器,監(jiān)聽白名單變化,只監(jiān)聽自己所在的app,白名單key不參與熱key計(jì)算,直接忽略

*/

@PostConstruct

public void watchWhiteList() {

AsyncPool.asyncDo(() -> {

//從etcd配置中獲取所有白名單

fetchWhite();

KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.whiteListPath + workerPath);

while (watchIterator.hasNext()) {

WatchUpdate watchUpdate = watchIterator.next();

logger.info("whiteList changed ");

try {

fetchWhite();

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

拉取并監(jiān)聽etcd白名單key配置,地址為/jd/whiteList/+$etcd.workerPath

在白名單的key,不參與熱key計(jì)算,直接忽略

放到線程池里面異步執(zhí)行,由于有etcd的監(jiān)聽,所以會(huì)一直執(zhí)行,而不是執(zhí)行一次結(jié)束 ④ 第四個(gè)@PostConstruct:makeSureSelfOn()


/**

* 每隔一會(huì)去check一下,自己還在不在etcd里

*/

@PostConstruct

public void makeSureSelfOn() {

//開啟上傳worker信息

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

scheduledExecutorService.scheduleAtFixedRate(() -> {

try {

if (canUpload) {

uploadSelfInfo();

}

} catch (Exception e) {

//do nothing

}

}, 0, 5, TimeUnit.SECONDS);

}

在線程池里面異步執(zhí)行,定時(shí)執(zhí)行,時(shí)間間隔為5s

將本機(jī)woker的hostName,ip+port以kv的形式定時(shí)上報(bào)給etcd,地址為/jd/workers/+$etcd.workPath+”/“+$hostName,續(xù)期時(shí)間為8s

有一個(gè)canUpload的開關(guān)來控制worker是否向etcd來定時(shí)續(xù)期,如果這個(gè)開關(guān)關(guān)閉了,代表worker不向etcd來續(xù)期,這樣當(dāng)上面地址的kv到期之后,etcd會(huì)刪除該節(jié)點(diǎn),這樣client循環(huán)判斷worker信息變化了

2)將熱key推送到dashboard供入庫:DashboardPusher ① 第五個(gè)@PostConstruct:uploadToDashboard()


@Component

public class DashboardPusher implements IPusher {

/**

* 熱key集中營

*/

private static LinkedBlockingQueue hotKeyStoreQueue = new LinkedBlockingQueue<>();

@PostConstruct

public void uploadToDashboard() {

AsyncPool.asyncDo(() -> {

while (true) {

try {

//要么key達(dá)到1千個(gè),要么達(dá)到1秒,就匯總上報(bào)給etcd一次

List tempModels = new ArrayList<>();

Queues.drain(hotKeyStoreQueue, tempModels, 1000, 1, TimeUnit.SECONDS);

if (CollectionUtil.isEmpty(tempModels)) {

continue;

}

//將熱key推到dashboard

DashboardHolder.flushToDashboard(FastJsonUtils.convertObjectToJSON(tempModels));

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

}

當(dāng)熱key的數(shù)量達(dá)到1000或者每隔1s,把熱key的數(shù)據(jù)通過與dashboard的netty通道來發(fā)送給dashboard,數(shù)據(jù)類型為REQUEST_HOT_KEY

LinkedBlockingQueue

hotKeyStoreQueue:worker計(jì)算的給dashboard熱key的集中營,所有給dashboard推送熱key存儲(chǔ)在里面 3)推送到各客戶端服務(wù)器:AppServerPusher ① 第六個(gè)@PostConstruct:batchPushToClient()


public class AppServerPusher implements IPusher {

/**

* 熱key集中營

*/

private static LinkedBlockingQueue hotKeyStoreQueue = new LinkedBlockingQueue<>();

/**

* 和dashboard那邊的推送主要區(qū)別在于,給app推送每10ms一次,dashboard那邊1s一次

*/

@PostConstruct

public void batchPushToClient() {

AsyncPool.asyncDo(() -> {

while (true) {

try {

List tempModels = new ArrayList<>();

//每10ms推送一次

Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);

if (CollectionUtil.isEmpty(tempModels)) {

continue;

}

Map> allAppHotKeyModels = new HashMap<>();

//拆分出每個(gè)app的熱key集合,按app分堆

for (HotKeyModel hotKeyModel : tempModels) {

List oneAppModels = allAppHotKeyModels.computeIfAbsent(hotKeyModel.getAppName(), (key) -> new ArrayList<>());

oneAppModels.add(hotKeyModel);

}

//遍歷所有app,進(jìn)行推送

for (AppInfo appInfo : ClientInfoHolder.apps) {

List list = allAppHotKeyModels.get(appInfo.getAppName());

if (CollectionUtil.isEmpty(list)) {

continue;

}

HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.RESPONSE_NEW_KEY);

hotKeyMsg.setHotKeyModels(list);

//整個(gè)app全部發(fā)送

appInfo.groupPush(hotKeyMsg);

}

//推送完,及時(shí)清理不使用內(nèi)存

allAppHotKeyModels = null;

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

}

會(huì)按照key的appName來進(jìn)行分組,然后通過對應(yīng)app的channelGroup來推送

當(dāng)熱key的數(shù)量達(dá)到10或者每隔10ms,把熱key的數(shù)據(jù)通過與app的netty通道來發(fā)送給app,數(shù)據(jù)類型為RESPONSE_NEW_KEY

LinkedBlockingQueue

hotKeyStoreQueue:worker計(jì)算的給client熱key的集中營,所有給client推送熱key存儲(chǔ)在里面 4)client實(shí)例節(jié)點(diǎn)處理:NodesServerStarter ① 第七個(gè)@PostConstruct:start()


public class NodesServerStarter {

@Value("${netty.port}")

private int port;

private Logger logger = LoggerFactory.getLogger(getClass());

@Resource

private IClientChangeListener iClientChangeListener;

@Resource

private List messageFilters;

@PostConstruct

public void start() {

AsyncPool.asyncDo(() -> {

logger.info("netty server is starting");

NodesServer nodesServer = new NodesServer();

nodesServer.setClientChangeListener(iClientChangeListener);

nodesServer.setMessageFilters(messageFilters);

try {

nodesServer.startNettyServer(port);

} catch (Exception e) {

e.printStackTrace();

}

});

}

}

線程池里面異步執(zhí)行,啟動(dòng)client端的nettyServer

iClientChangeListener和messageFilters這兩個(gè)依賴最終會(huì)被傳遞到netty消息處理器里面,iClientChangeListener會(huì)作為channel下線處理來刪除ClientInfoHolder下線或者超時(shí)的通道,messageFilters會(huì)作為netty收到事件消息的處理過濾器(責(zé)任鏈模式) ② 依賴的bean:IClientChangeListener iClientChangeListener


public interface IClientChangeListener {

/**

* 發(fā)現(xiàn)新連接

*/

void newClient(String appName, String channelId, ChannelHandlerContext ctx);

/**

* 客戶端掉線

*/

void loseClient(ChannelHandlerContext ctx);

}

對客戶端的管理,新來(newClient)(會(huì)觸發(fā)netty的連接方法channelActive)、斷線(loseClient)(會(huì)觸發(fā)netty的斷連方法channelInactive())的管理 client的連接信息主要是在ClientInfoHolder里面

List

apps,這里面的AppInfo主要是appName和對應(yīng)的channelGroup

對apps的add和remove主要是通過新來(newClient)、斷線(loseClient) ③ 依賴的bean:List

messageFilters


/**

* 對netty來的消息,進(jìn)行過濾處理

* @author wuweifeng wrote on 2019-12-11

* @version 1.0

*/

public interface INettyMsgFilter {

boolean chain(HotKeyMsg message, ChannelHandlerContext ctx);

}

對client發(fā)給worker的netty消息,進(jìn)行過濾處理,共有四個(gè)實(shí)現(xiàn)類,也就是說底下四個(gè)過濾器都是收到client發(fā)送的netty消息來做處理 ④ 各個(gè)消息處理的類型:MessageType


APP_NAME((byte) 1),

REQUEST_NEW_KEY((byte) 2),

RESPONSE_NEW_KEY((byte) 3),

REQUEST_HIT_COUNT((byte) 7), //命中率

REQUEST_HOT_KEY((byte) 8), //熱key,worker->dashboard

PING((byte) 4), PONG((byte) 5),

EMPTY((byte) 6);

順序1:HeartBeatFilter

當(dāng)消息類型為PING,則給對應(yīng)的client示例返回PONG

順序2:AppNameFilter

當(dāng)消息類型為APP_NAME,代表client與worker建立連接成功,然后調(diào)用iClientChangeListener的newClient方法增加apps元數(shù)據(jù)信息

順序3:HotKeyFilter

處理接收消息類型為REQUEST_NEW_KEY

先給HotKeyFilter.totalReceiveKeyCount原子類增1,該原子類代表worker實(shí)例接收到的key的總數(shù)

publishMsg方法,將消息通過自建的生產(chǎn)者消費(fèi)者模型(KeyProducer,KeyConsumer),來把消息給發(fā)到生產(chǎn)者中分發(fā)消費(fèi)

接收到的消息HotKeyMsg里面List

首先判斷HotKeyModel里面的key是否在白名單內(nèi),如果在則跳過,否則將HotKeyModel通過KeyProducer發(fā)送

順序4:KeyCounterFilter

處理接收類型為REQUEST_HIT_COUNT

這個(gè)過濾器是專門給dashboard來匯算key的,所以這個(gè)appName直接設(shè)置為該worker配置的appName

該過濾器的數(shù)據(jù)來源都是client的NettyKeyPusher#sendCount(String appName, List

list),這里面的數(shù)據(jù)都是默認(rèn)積攢10s的,這個(gè)10s是可以配置的,這一點(diǎn)在client里面有講

將構(gòu)造的new KeyCountItem(appName, models.get(0).getCreateTime(), models)放到阻塞隊(duì)列LinkedBlockingQueue

COUNTER_QUEUE中,然后讓CounterConsumer來消費(fèi)處理,消費(fèi)邏輯是單線程的

CounterConsumer:熱key統(tǒng)計(jì)消費(fèi)者

放在公共線程池中,來單線程執(zhí)行

從阻塞隊(duì)列COUNTER_QUEUE里面取數(shù)據(jù),然后將里面的key的統(tǒng)計(jì)數(shù)據(jù)發(fā)布到etcd的/jd/keyHitCount/+ appName + “/“ + IpUtils.getIp() + “-“ + System.currentTimeMillis()里面,該路徑是worker服務(wù)的client集群或者default,用來存放客戶端hotKey訪問次數(shù)和總訪問次數(shù)的path,然后讓dashboard來訂閱統(tǒng)計(jì)展示

2.三個(gè)定時(shí)任務(wù):3個(gè)@Scheduled

1)定時(shí)任務(wù)1:EtcdStarter#pullRules()


/**

* 每隔1分鐘拉取一次,所有的app的rule

*/

@Scheduled(fixedRate = 60000)

public void pullRules() {

try {

if (isForSingle()) {

String value = configCenter.get(ConfigConstant.rulePath + workerPath);

if (!StrUtil.isEmpty(value)) {

List keyRules = FastJsonUtils.toList(value, KeyRule.class);

KeyRuleHolder.put(workerPath, keyRules);

}

} else {

List keyValues = configCenter.getPrefix(ConfigConstant.rulePath);

for (KeyValue keyValue : keyValues) {

ruleChange(keyValue);

}

}

} catch (StatusRuntimeException ex) {

logger.error(ETCD_DOWN);

}

}

每隔1分鐘拉取一次etcd地址為/jd/rules/的規(guī)則變化,如果worker所服務(wù)的app或者default的rule有變化,則更新規(guī)則的緩存,并清空該appName所對應(yīng)的本地key緩存 2)定時(shí)任務(wù)2:EtcdStarter#uploadClientCount()


/**

* 每隔10秒上傳一下client的數(shù)量到etcd中

*/

@Scheduled(fixedRate = 10000)

public void uploadClientCount() {

try {

String ip = IpUtils.getIp();

for (AppInfo appInfo : ClientInfoHolder.apps) {

String appName = appInfo.getAppName();

int count = appInfo.size();

//即便是full gc也不能超過3秒,因?yàn)檫@里給的過期時(shí)間是13s,由于該定時(shí)任務(wù)每隔10s執(zhí)行一次,如果full gc或者說上報(bào)給etcd的時(shí)間超過3s,

//則在dashboard查詢不到client的數(shù)量

configCenter.putAndGrant(ConfigConstant.clientCountPath + appName + "/" + ip, count + "", 13);

}

configCenter.putAndGrant(ConfigConstant.caffeineSizePath + ip, FastJsonUtils.convertObjectToJSON(CaffeineCacheHolder.getSize()), 13);

//上報(bào)每秒QPS(接收key數(shù)量、處理key數(shù)量)

String totalCount = FastJsonUtils.convertObjectToJSON(new TotalCount(HotKeyFilter.totalReceiveKeyCount.get(), totalDealCount.longValue()));

configCenter.putAndGrant(ConfigConstant.totalReceiveKeyCount + ip, totalCount, 13);

logger.info(totalCount + " expireCount:" + expireTotalCount + " offerCount:" + totalOfferCount);

//如果是穩(wěn)定一直有key發(fā)送的應(yīng)用,建議開啟該監(jiān)控,以避免可能發(fā)生的網(wǎng)絡(luò)故障

if (openMonitor) {

checkReceiveKeyCount();

}

// configCenter.putAndGrant(ConfigConstant.bufferPoolPath + ip, MemoryTool.getBufferPool() + "", 10);

} catch (Exception ex) {

logger.error(ETCD_DOWN);

}

}

每個(gè)10s將worker計(jì)算存儲(chǔ)的client信息上報(bào)給etcd,來方便dashboard來查詢展示,比如/jd/count/對應(yīng)client數(shù)量,/jd/caffeineSize/對應(yīng)caffeine緩存的大小,/jd/totalKeyCount/對應(yīng)該worker接收的key總量和處理的key總量

可以從代碼中看到,上面所有etcd的節(jié)點(diǎn)租期時(shí)間都是13s,而該定時(shí)任務(wù)是每10s執(zhí)行一次,意味著如果full gc或者說上報(bào)給etcd的時(shí)間超過3s,則在dashboard查詢不到client的相關(guān)匯算信息

長時(shí)間不收到key,判斷網(wǎng)絡(luò)狀態(tài)不好,斷開worker給etcd地址為/jd/workers/+$workerPath節(jié)點(diǎn)的續(xù)租,因?yàn)閏lient會(huì)循環(huán)判斷該地址的節(jié)點(diǎn)是否變化,使得client重新連接worker或者斷開失聯(lián)的worker 3)定時(shí)任務(wù)3:EtcdStarter#fetchDashboardIp()


/**

* 每隔30秒去獲取一下dashboard的地址

*/

@Scheduled(fixedRate = 30000)

public void fetchDashboardIp() {

try {

//獲取DashboardIp

List keyValues = configCenter.getPrefix(ConfigConstant.dashboardPath);

//是空,給個(gè)警告

if (CollectionUtil.isEmpty(keyValues)) {

logger.warn("very important warn !!! Dashboard ip is null!!!");

return;

}

String dashboardIp = keyValues.get(0).getValue().toStringUtf8();

NettyClient.getInstance().connect(dashboardIp);

} catch (Exception e) {

e.printStackTrace();

}

}

每隔30s拉取一次etcd前綴為/jd/dashboard/的dashboard連接ip的值,并且判斷DashboardHolder.hasConnected里面是否為未連接狀態(tài),如果是則重新連接worker與dashboard的netty通道

3.自建的生產(chǎn)者消費(fèi)者模型(KeyProducer,KeyConsumer)

一般生產(chǎn)者消費(fèi)者模型包含三大元素:生產(chǎn)者、消費(fèi)者、消息存儲(chǔ)隊(duì)列 這里消息存儲(chǔ)隊(duì)列是DispatcherConfig里面的QUEUE,使用LinkedBlockingQueue,默認(rèn)大小為200W 1)KeyProducer


@Component

public class KeyProducer {

public void push(HotKeyModel model, long now) {

if (model == null || model.getKey() == null) {

return;

}

//5秒前的過時(shí)消息就不處理了

if (now - model.getCreateTime() > InitConstant.timeOut) {

expireTotalCount.increment();

return;

}

try {

QUEUE.put(model);

totalOfferCount.increment();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

判斷接收到的HotKeyModel是否超出”netty.timeOut”配置的時(shí)間,如果是將expireTotalCount紀(jì)錄過期總數(shù)給自增,然后返回 2)KeyConsumer


public class KeyConsumer {

private IKeyListener iKeyListener;

public void setKeyListener(IKeyListener iKeyListener) {

this.iKeyListener = iKeyListener;

}

public void beginConsume() {

while (true) {

try {

//從這里可以看出,這里的生產(chǎn)者消費(fèi)者模型,本質(zhì)上還是拉模式,之所以不使用EventBus,是因?yàn)樾枰?duì)列來做緩沖

HotKeyModel model = QUEUE.take();

if (model.isRemove()) {

iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);

} else {

iKeyListener.newKey(model, KeyEventOriginal.CLIENT);

}

//處理完畢,將數(shù)量加1

totalDealCount.increment();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

@Override

public void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {

//cache里的key,appName+keyType+key

String key = buildKey(hotKeyModel);

hotCache.invalidate(key);

CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);

//推送所有client刪除

hotKeyModel.setCreateTime(SystemClock.now());

logger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());

for (IPusher pusher : iPushers) {

//這里可以看到,刪除熱key的netty消息只給client端發(fā)了過去,沒有給dashboard發(fā)過去(DashboardPusher里面的remove是個(gè)空方法)

pusher.remove(hotKeyModel);

}

}

@Override

public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {

//cache里的key

String key = buildKey(hotKeyModel);

//判斷是不是剛熱不久

//hotCache對應(yīng)的caffeine有效期為5s,也就是說該key會(huì)保存5s,在5s內(nèi)不重復(fù)處理相同的hotKey。

//畢竟hotKey都是瞬時(shí)流量,可以避免在這5s內(nèi)重復(fù)推送給client和dashboard,避免無效的網(wǎng)絡(luò)開銷

Object o = hotCache.getIfPresent(key);

if (o != null) {

return;

}

//********** watch here ************//

//該方法會(huì)被InitConstant.threadCount個(gè)線程同時(shí)調(diào)用,存在多線程問題

//下面的那句addCount是加了鎖的,代表給Key累加數(shù)量時(shí)是原子性的,不會(huì)發(fā)生多加、少加的情況,到了設(shè)定的閾值一定會(huì)hot

//譬如閾值是2,如果多個(gè)線程累加,在沒hot前,hot的狀態(tài)肯定是對的,譬如thread1 加1,thread2加1,那么thread2會(huì)hot返回true,開啟推送

//但是極端情況下,譬如閾值是10,當(dāng)前是9,thread1走到這里時(shí),加1,返回true,thread2也走到這里,加1,此時(shí)是11,返回true,問題來了

//該key會(huì)走下面的else兩次,也就是2次推送。

//所以出現(xiàn)問題的原因是hotCache.getIfPresent(key)這一句在并發(fā)情況下,沒return掉,放了兩個(gè)key+1到addCount這一步時(shí),會(huì)有問題

//測試代碼在TestBlockQueue類,直接運(yùn)行可以看到會(huì)同時(shí)hot

//那么該問題用解決嗎,NO,不需要解決,1 首先要發(fā)生的條件極其苛刻,很難觸發(fā),以京東這樣高的并發(fā)量,線上我也沒見過觸發(fā)連續(xù)2次推送同一個(gè)key的

//2 即便觸發(fā)了,后果也是可以接受的,2次推送而已,毫無影響,客戶端無感知。但是如果非要解決,就要對slidingWindow實(shí)例加鎖了,必然有一些開銷

//所以只要保證key數(shù)量不多計(jì)算就可以,少計(jì)算了沒事。因?yàn)闊醟ey必然頻率高,漏計(jì)幾次沒事。但非熱key,多計(jì)算了,被干成了熱key就不對了

SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);//從這里可知,每個(gè)app的每個(gè)key都會(huì)對應(yīng)一個(gè)滑動(dòng)窗口

//看看hot沒

boolean hot = slidingWindow.addCount(hotKeyModel.getCount());

if (!hot) {

//如果沒hot,重新put,cache會(huì)自動(dòng)刷新過期時(shí)間

CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).put(key, slidingWindow);

} else {

//這里之所以放入的value為1,是因?yàn)閔otCache是用來專門存儲(chǔ)剛生成的hotKey

//hotCache對應(yīng)的caffeine有效期為5s,也就是說該key會(huì)保存5s,在5s內(nèi)不重復(fù)處理相同的hotKey。

//畢竟hotKey都是瞬時(shí)流量,可以避免在這5s內(nèi)重復(fù)推送給client和dashboard,避免無效的網(wǎng)絡(luò)開銷

hotCache.put(key, 1);

//刪掉該key

//這個(gè)key從實(shí)際上是專門針對slidingWindow的key,他的組合邏輯是appName+keyType+key,而不是給client和dashboard推送的hotKey

CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);

//開啟推送

hotKeyModel.setCreateTime(SystemClock.now());

//當(dāng)開關(guān)打開時(shí),打印日志。大促時(shí)關(guān)閉日志,就不打印了

if (EtcdStarter.LOGGER_ON) {

logger.info(NEW_KEY_EVENT + hotKeyModel.getKey());

}

//分別推送到各client和etcd

for (IPusher pusher : iPushers) {

pusher.push(hotKeyModel);

}

}

}

“thread.count”配置即為消費(fèi)者個(gè)數(shù),多個(gè)消費(fèi)者共同消費(fèi)一個(gè)QUEUE隊(duì)列 生產(chǎn)者消費(fèi)者模型,本質(zhì)上還是拉模式,之所以不使用EventBus,是因?yàn)樾枰?duì)列來做緩沖 根據(jù)HotKeyModel里面是否是刪除消息類型

刪除CaffeineCacheHolder里面對應(yīng)newkey的滑動(dòng)窗口緩存。

向該hotKeyModel對應(yīng)的app的client推送netty消息,表示新產(chǎn)生hotKey,使得client本地緩存,但是推送的netty消息只代表為熱key,client本地緩存不會(huì)存儲(chǔ)key對應(yīng)的value值,需要調(diào)用JdHotKeyStore里面的api來給本地緩存的value賦值

向dashboard推送hotKeyModel,表示新產(chǎn)生hotKey

刪除消息類型

根據(jù)HotKeyModel里面的appName+keyType+key的名字,來構(gòu)建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動(dòng)時(shí)間窗對應(yīng)

刪除hotCache里面newkey的緩存,放入的緩存kv分別是newKey和1,hotCache作用是用來存儲(chǔ)該生成的熱key,hotCache對應(yīng)的caffeine有效期為5s,也就是說該key會(huì)保存5s,在5s內(nèi)不重復(fù)處理相同的hotKey。畢竟hotKey都是瞬時(shí)流量,可以避免在這5s內(nèi)重復(fù)推送給client和dashboard,避免無效的網(wǎng)絡(luò)開銷

刪除CaffeineCacheHolder里面對應(yīng)appName的caffeine里面的newKey,這里面存儲(chǔ)的是slidingWindow滑動(dòng)窗口

推送給該HotKeyModel對應(yīng)的所有client實(shí)例,用來讓client刪除該HotKeyModel

非刪除消息類型

根據(jù)HotKeyModel里面的appName+keyType+key的名字,來構(gòu)建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動(dòng)時(shí)間窗對應(yīng)

通過hotCache來判斷該newkey是否剛熱不久,如果是則返回

根據(jù)滑動(dòng)時(shí)間窗口來計(jì)算判斷該key是否為hotKey(這里可以學(xué)習(xí)一下滑動(dòng)時(shí)間窗口的設(shè)計(jì)),并返回或者生成該newKey對應(yīng)的滑動(dòng)窗口

如果沒有達(dá)到熱key的標(biāo)準(zhǔn)

通過CaffeineCacheHolder重新put,cache會(huì)自動(dòng)刷新過期時(shí)間

如果達(dá)到了熱key標(biāo)準(zhǔn)

向hotCache里面增加newkey對應(yīng)的緩存,value為1表示剛為熱key。

3)計(jì)算熱key滑動(dòng)窗口的設(shè)計(jì) 限于篇幅的原因,這里就不細(xì)談了,直接貼出項(xiàng)目作者對其寫的說明文章:Java簡單實(shí)現(xiàn)滑動(dòng)窗口

3.3.4 dashboard端

這個(gè)沒啥可說的了,就是連接etcd、mysql,增刪改查,不過京東的前端框架很方便,直接返回list就可以成列表。

4 總結(jié)

文章第二部分為大家講解了redis數(shù)據(jù)傾斜的原因以及應(yīng)對方案,并對熱點(diǎn)問題進(jìn)行了深入,從發(fā)現(xiàn)熱key到解決熱key的兩個(gè)關(guān)鍵問題的總結(jié)。 文章第三部分是熱key問題解決方案——JD開源hotkey的源碼解析,分別從client端、worker端、dashboard端來進(jìn)行全方位講解,包括其設(shè)計(jì)、使用及相關(guān)原理。 希望通過這篇文章,能夠使大家不僅學(xué)習(xí)到相關(guān)方法論,也能明白其方法論具體的落地方案,一起學(xué)習(xí),一起成長。

審核編輯:湯梓紅

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報(bào)投訴
  • 開源
    +關(guān)注

    關(guān)注

    3

    文章

    3689

    瀏覽量

    43834
  • Redis
    +關(guān)注

    關(guān)注

    0

    文章

    387

    瀏覽量

    11446

原文標(biāo)題:Redis數(shù)據(jù)傾斜與JD開源hotkey源碼分析揭秘

文章出處:【微信號(hào):OSC開源社區(qū),微信公眾號(hào):OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關(guān)推薦
    熱點(diǎn)推薦

    【幸狐Omni3576邊緣計(jì)算套件試用體驗(yàn)】Redis最新8.0.2版本源碼安裝及性能測試

    engine, and message broker. 數(shù)以百萬計(jì)的開發(fā)人員用作數(shù)據(jù)庫、緩存、流式處理引擎和消息代理的開源內(nèi)存數(shù)據(jù)存儲(chǔ)。 二、源碼編譯
    發(fā)表于 06-03 01:28

    企業(yè)打開Redis的正確方式,來自阿里云云數(shù)據(jù)庫團(tuán)隊(duì)的解讀

    的情況下,每個(gè)業(yè)務(wù)都需要各自特定的數(shù)據(jù)庫架構(gòu)和優(yōu)化方案,需要加入OLAP 、離線分析任務(wù),并且考慮高速擴(kuò)展、高性能、高可靠等問題。Redis開源的基于內(nèi)存且可以持久化的分布式 Key
    發(fā)表于 02-07 14:06

    走近源碼Redis如何執(zhí)行命令的

    走近源碼Redis如何執(zhí)行命令
    發(fā)表于 06-09 16:31

    APT給AVER的困擾點(diǎn)有哪些?對于APT有什么應(yīng)對方案?

    APT與傳統(tǒng)的病毒時(shí)代有什么不同?APT給AVER的困擾點(diǎn)有哪些?對于APT有什么應(yīng)對方案?
    發(fā)表于 07-05 06:32

    如何使得redis中的數(shù)據(jù)不再有

    原因redis的持久化功能導(dǎo)致的,所謂的持久化就是redis在系統(tǒng)關(guān)閉的時(shí)候把數(shù)據(jù)存儲(chǔ)到硬盤中,在下一次啟動(dòng)的時(shí)候,在從硬盤恢復(fù)到redis
    發(fā)表于 11-05 08:50

    【昉·星光 2 高性能RISC-V單板計(jì)算機(jī)體驗(yàn)】Redis源碼編譯和性能測試以及與樹莓派4B對比

    本文首先介紹Redis是什么,然后介紹如何在VisionFive2上編譯Redis源碼,以及源碼安裝R
    發(fā)表于 12-10 21:27

    【愛芯派 Pro 開發(fā)板試用體驗(yàn)】Redis源碼編譯和基準(zhǔn)測試

    庫、緩存、流式處理引擎和消息代理的開源內(nèi)存數(shù)據(jù)存儲(chǔ)。 二、源碼編譯Redis 2.1 安裝git和編譯工具鏈 # 安裝 git 和編譯工具鏈 sudo aptinstall git
    發(fā)表于 12-10 22:18

    舵機(jī)失靈的主要原因_舵機(jī)失靈的應(yīng)對方

    本文首先介紹了舵機(jī)失靈的主要原因,其次介紹了舵機(jī)失靈的應(yīng)對方法,最后介紹了舵機(jī)日常維護(hù)的重點(diǎn)以及提高船舶應(yīng)急應(yīng)變能力的對策,具體的跟隨小編一起來了解一下。
    的頭像 發(fā)表于 05-30 14:28 ?4.5w次閱讀

    氨水罐滲漏的快速應(yīng)對方案

    氨水罐滲漏的快速應(yīng)對方案
    發(fā)表于 02-28 10:04 ?7次下載

    運(yùn)放輸出失調(diào)電壓的影響以及應(yīng)對方法說明

    運(yùn)放輸出失調(diào)電壓的影響以及應(yīng)對方
    的頭像 發(fā)表于 03-17 16:58 ?1.5w次閱讀
    運(yùn)放輸出失調(diào)電壓的影響<b class='flag-5'>以及</b><b class='flag-5'>應(yīng)對方</b>法說明

    ECG子系統(tǒng)設(shè)計(jì)主要挑戰(zhàn)及應(yīng)對方案

    電子發(fā)燒友網(wǎng)站提供《ECG子系統(tǒng)設(shè)計(jì)主要挑戰(zhàn)及應(yīng)對方案.pdf》資料免費(fèi)下載
    發(fā)表于 11-23 10:43 ?1次下載
    ECG子系統(tǒng)設(shè)計(jì)主要挑戰(zhàn)及<b class='flag-5'>應(yīng)對方案</b>

    電源電壓變化對晶振性能的影響以及應(yīng)對方

    電源電壓變化對晶振性能的影響以及應(yīng)對方法? 電源電壓的變化是指電源輸入電壓的波動(dòng)或變化,它可能產(chǎn)生一系列的問題,對晶振的性能和工作穩(wěn)定性產(chǎn)生影響。本文將詳細(xì)討論電源電壓變化對晶振的影響,并提供應(yīng)對方
    的頭像 發(fā)表于 12-18 14:09 ?2125次閱讀

    新版 Redis 不再“開源”,對使用者都有哪些影響?

    2024 年 3 月 20 日,Redis Labs 宣布從 Redis 7.4 開始,將原先比較寬松的 BSD 源碼使用協(xié)議修改為 RSAv2和 SSPLv1協(xié)議。該變化意味著 Redis
    的頭像 發(fā)表于 03-27 22:30 ?728次閱讀
    新版 <b class='flag-5'>Redis</b> 不再“<b class='flag-5'>開源</b>”,對使用者都有哪些影響?

    Redis開源版與Redis企業(yè)版,怎么選用?

    點(diǎn)擊“藍(lán)字”關(guān)注我們數(shù)以千計(jì)的企業(yè)和數(shù)以百萬計(jì)的開發(fā)人員Redis開源版來構(gòu)建應(yīng)用程序。但隨著用戶數(shù)量、數(shù)據(jù)量和地區(qū)性的增加,成本、可擴(kuò)展性、運(yùn)營和可用性等問題也隨之而來。Redis
    的頭像 發(fā)表于 04-04 08:04 ?1580次閱讀
    <b class='flag-5'>Redis</b><b class='flag-5'>開源</b>版與<b class='flag-5'>Redis</b>企業(yè)版,怎么選用?

    【經(jīng)驗(yàn)分享】在Omni3576上編譯Redis-8.0.2源碼,并安裝及性能測試

    本文首先介紹Redis是什么,然后介紹如何在Omni3576上編譯Redis-8.0.2源碼,以及源碼編譯、安裝
    的頭像 發(fā)表于 06-05 08:05 ?294次閱讀
    【經(jīng)驗(yàn)分享】在Omni3576上編譯<b class='flag-5'>Redis</b>-8.0.2<b class='flag-5'>源碼</b>,并安裝及性能測試