一区二区三区三上|欧美在线视频五区|国产午夜无码在线观看视频|亚洲国产裸体网站|无码成年人影视|亚洲AV亚洲AV|成人开心激情五月|欧美性爱内射视频|超碰人人干人人上|一区二区无码三区亚洲人区久久精品

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

這么多技術(shù)框架,為什么選debezium?

jf_ro2CN3Fa ? 來(lái)源:稀土掘金 ? 2023-08-30 16:40 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

在一些小型項(xiàng)目當(dāng)中,沒(méi)有引入消息中間件,也不想引入,但有一些業(yè)務(wù)邏輯想要解耦異步,那怎么辦呢?

我們的web項(xiàng)目,單獨(dú)內(nèi)網(wǎng)部署,由于大數(shù)據(jù)背景,公司消息中間件統(tǒng)一使用的kafka,在一些小項(xiàng)目上kafka就顯得很笨重。 引入rocketmq或rabittmq也沒(méi)必要。 事件或多線程也不適合。

具體一點(diǎn)的,之前對(duì)接的一個(gè)系統(tǒng),一張記錄表有10+以上的類(lèi)型狀態(tài),新的需求是,針對(duì)每種狀態(tài)做出對(duì)應(yīng)的不同的操作。 之前寫(xiě)入這張記錄表的時(shí)候,方式也是五花八門(mén),有的是單條記錄寫(xiě)入,有的是批量寫(xiě)入,有的調(diào)用了統(tǒng)一的service,有的呢直接調(diào)用了DAO層mapper直接寫(xiě)入。

所以想找到一個(gè)統(tǒng)一入口進(jìn)行切入處理,就不行了。

這個(gè)時(shí)候就算引入消息隊(duì)列,也需要在不同的業(yè)務(wù)方法里進(jìn)行寫(xiě)入消息的操作。業(yè)務(wù)方也不太愿意配合改。

可以使用觸發(fā)器,但它是屬于上個(gè)時(shí)代的產(chǎn)物,槽點(diǎn)太多。(這里并不是完全不主張使用觸發(fā)器,技術(shù)永遠(yuǎn)是為業(yè)務(wù)服務(wù)的,只要評(píng)估覺(jué)得可行,就可以使用)那么這個(gè)時(shí)候,CDC技術(shù)就可以粉墨登場(chǎng)了。

CDC(change data capture)數(shù)據(jù)更改捕獲。常見(jiàn)的數(shù)據(jù)更改捕獲都是通過(guò)數(shù)據(jù)庫(kù)比如mysql的binlog來(lái)達(dá)到目的。

我們可以監(jiān)控mysql binlog日志,當(dāng)寫(xiě)入一條數(shù)據(jù)的時(shí)候,接收到數(shù)據(jù)變更日志,做出相應(yīng)的操作。

這樣的好處是,只需導(dǎo)入依賴(lài),不額外引入組件,同時(shí)無(wú)需改動(dòng)之前的代碼。 兩邊完全解耦,互不干擾。

常見(jiàn)的CDC框架,比如,canal (非Camel)

canal [k?'n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi) 早期阿里巴巴因?yàn)楹贾莺兔绹?guó)雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實(shí)現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。 從 2010 年開(kāi)始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫(kù)日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫(kù)增量訂閱和消費(fèi)業(yè)務(wù)。

它是基于日志增量訂閱和消費(fèi)的業(yè)務(wù),包括

數(shù)據(jù)庫(kù)鏡像 數(shù)據(jù)庫(kù)實(shí)時(shí)備份 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等) 業(yè)務(wù) cache 刷新 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理

c88b4a38-3e3c-11ee-ac96-dac502259ad0.jpg

它的原理

canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議

MySQL master 收到 dump 請(qǐng)求,開(kāi)始推送 binary log 給 slave (即 canal );關(guān)注工眾號(hào):碼猿技術(shù)專(zhuān)欄,回復(fù)關(guān)鍵詞:1111 獲取阿里內(nèi)部Java性能調(diào)優(yōu)手冊(cè)!

canal 解析 binary log 對(duì)象(原始為 byte 流)

再比如,debezium(音同 dbzm 滴BZ姆)很多人可能不太了解. 包括databus,maxwell,flink cdc(大數(shù)據(jù)領(lǐng)域)等等,它們同屬CDC捕獲數(shù)據(jù)更改(change data capture)類(lèi)的技術(shù)。

c8b79c1e-3e3c-11ee-ac96-dac502259ad0.jpg

為什么是debezium

這么多技術(shù)框架,為什么選debezium?

看起來(lái)很多。但一一排除下來(lái)就debezium和canal。

sqoop,kettle,datax之類(lèi)的工具,屬于前大數(shù)據(jù)時(shí)代的產(chǎn)物,地位類(lèi)似于web領(lǐng)域的structs2。而且,它們基于查詢(xún)而非binlog日志,其實(shí)不屬于CDC。首先排除。

flink cdc是大數(shù)據(jù)領(lǐng)域的框架,一般web項(xiàng)目的數(shù)據(jù)量屬于大材小用了。

同時(shí)databus,maxwell相對(duì)比較冷門(mén),用得比較少。

最后不用canal的原因有以下幾點(diǎn)。

canal需要安裝,這違背了“如非必要,勿增實(shí)體”的原則。

canal只能對(duì)MYSQL進(jìn)行CDC監(jiān)控。有很大的局限性。

大數(shù)據(jù)領(lǐng)域非常流行的flink cdc(阿里團(tuán)隊(duì)主導(dǎo))底層使用的也是debezium,而非同是阿里出品的canal。

debezium可借助kafka組件,將變動(dòng)的數(shù)據(jù)發(fā)到kafka topic,后續(xù)的讀取操作只需讀取kafka,可有效減少數(shù)據(jù)庫(kù)的讀取壓力??杀WC一次語(yǔ)義,至少一次語(yǔ)義。 同時(shí),也可基于內(nèi)嵌部署模式,無(wú)需我們手動(dòng)部署kafka集群,可滿(mǎn)足”如非必要,勿增實(shí)體“的原則。

c8eacf4e-3e3c-11ee-ac96-dac502259ad0.jpg

Debezium是一個(gè)捕獲數(shù)據(jù)更改(CDC)平臺(tái),并且利用Kafka和Kafka Connect實(shí)現(xiàn)了自己的持久性、可靠性和容錯(cuò)性。 每一個(gè)部署在Kafka Connect分布式的、可擴(kuò)展的、容錯(cuò)性的服務(wù)中的connector監(jiān)控一個(gè)上游數(shù)據(jù)庫(kù)服務(wù)器,捕獲所有的數(shù)據(jù)庫(kù)更改, 然后記錄到一個(gè)或者多個(gè)Kafka topic(通常一個(gè)數(shù)據(jù)庫(kù)表對(duì)應(yīng)一個(gè)kafka topic)。

Kafka確保所有這些數(shù)據(jù)更改事件都能夠多副本并且總體上有序(Kafka只能保證一個(gè)topic的單個(gè)分區(qū)內(nèi)有序),這樣, 更多的客戶(hù)端可以獨(dú)立消費(fèi)同樣的數(shù)據(jù)更改事件而對(duì)上游數(shù)據(jù)庫(kù)系統(tǒng)造成的影響降到很小(如果N個(gè)應(yīng)用都直接去監(jiān)控?cái)?shù)據(jù)庫(kù)更改,對(duì)數(shù)據(jù)庫(kù)的壓力為N, 而用debezium匯報(bào)數(shù)據(jù)庫(kù)更改事件到kafka,所有的應(yīng)用都去消費(fèi)kafka中的消息,可以把對(duì)數(shù)據(jù)庫(kù)的壓力降到1)。

另外,客戶(hù)端可以隨時(shí)停止消費(fèi),然后重啟, 從上次停止消費(fèi)的地方接著消費(fèi)。每個(gè)客戶(hù)端可以自行決定他們是否需要exactly-once或者at-least-once消息交付語(yǔ)義保證, 并且所有的數(shù)據(jù)庫(kù)或者表的更改事件是按照上游數(shù)據(jù)庫(kù)發(fā)生的順序被交付的。

c90670aa-3e3c-11ee-ac96-dac502259ad0.jpg

對(duì)于不需要或者不想要這種容錯(cuò)級(jí)別、性能、可擴(kuò)展性、可靠性的應(yīng)用,他們可以使用內(nèi)嵌的Debezium connector引擎來(lái)直接在應(yīng)用內(nèi)部運(yùn)行connector。 這種應(yīng)用仍需要消費(fèi)數(shù)據(jù)庫(kù)更改事件,但更希望connector直接傳遞給它,而不是持久化到Kafka里。

簡(jiǎn)介

Debezium是一個(gè)開(kāi)源項(xiàng)目,為捕獲數(shù)據(jù)更改(change data capture,CDC)提供了一個(gè)低延遲的流式處理平臺(tái)。你可以安裝并且配置Debezium去監(jiān)控你的數(shù)據(jù)庫(kù),然后你的應(yīng)用就可以消費(fèi)對(duì)數(shù)據(jù)庫(kù)的每一個(gè)行級(jí)別(row-level)的更改。只有已提交的更改才是可見(jiàn)的,所以你的應(yīng)用不用擔(dān)心事務(wù)(transaction)或者更改被回滾(roll back)。Debezium為所有的數(shù)據(jù)庫(kù)更改事件提供了一個(gè)統(tǒng)一的模型,所以你的應(yīng)用不用擔(dān)心每一種數(shù)據(jù)庫(kù)管理系統(tǒng)的錯(cuò)綜復(fù)雜性。另外,由于Debezium用持久化的、有副本備份的日志來(lái)記錄數(shù)據(jù)庫(kù)數(shù)據(jù)變化的歷史,因此,你的應(yīng)用可以隨時(shí)停止再重啟,而不會(huì)錯(cuò)過(guò)它停止運(yùn)行時(shí)發(fā)生的事件,保證了所有的事件都能被正確地、完全地處理掉。

監(jiān)控?cái)?shù)據(jù)庫(kù),并且在數(shù)據(jù)變動(dòng)的時(shí)候獲得通知一直是很復(fù)雜的事情。關(guān)系型數(shù)據(jù)庫(kù)的觸發(fā)器可以做到,但是只對(duì)特定的數(shù)據(jù)庫(kù)有效,而且通常只能更新數(shù)據(jù)庫(kù)內(nèi)的狀態(tài)(無(wú)法和外部的進(jìn)程通信)。一些數(shù)據(jù)庫(kù)提供了監(jiān)控?cái)?shù)據(jù)變動(dòng)的API或者框架,但是沒(méi)有一個(gè)標(biāo)準(zhǔn),每種數(shù)據(jù)庫(kù)的實(shí)現(xiàn)方式都是不同的,并且需要大量特定的知識(shí)和理解特定的代碼才能運(yùn)用。確保以相同的順序查看和處理所有更改,同時(shí)最小化影響數(shù)據(jù)庫(kù)仍然非常具有挑戰(zhàn)性。

Debezium提供了模塊為你做這些復(fù)雜的工作。一些模塊是通用的,并且能夠適用多種數(shù)據(jù)庫(kù)管理系統(tǒng),但在功能和性能方面仍有一些限制。另一些模塊是為特定的數(shù)據(jù)庫(kù)管理系統(tǒng)定制的,所以他們通??梢愿嗟乩脭?shù)據(jù)庫(kù)系統(tǒng)本身的特性來(lái)提供更多功能。

github官網(wǎng)上羅列的一些典型應(yīng)用場(chǎng)景

緩存失效(Cache invalidation) 經(jīng)典問(wèn)題 Redis與MySQL雙寫(xiě)一致性如何保證?Debezium利用kafka單分區(qū)的有序性(忽略mysql binlog本身可能的延遲和亂序),可完全解決此問(wèn)題。 在緩存中緩存的條目(entry)在源頭被更改或者被刪除的時(shí)候立即讓緩存中的條目失效。 如果緩存在一個(gè)獨(dú)立的進(jìn)程中運(yùn)行(例如Redis,Memcache,Infinispan或者其他的),那么簡(jiǎn)單的緩存失效邏輯可以放在獨(dú)立的進(jìn)程或服務(wù)中, 從而簡(jiǎn)化主應(yīng)用的邏輯。在一些場(chǎng)景中,緩存失效邏輯可以更復(fù)雜一點(diǎn),讓它利用更改事件中的更新數(shù)據(jù)去更新緩存中受影響的條目。

簡(jiǎn)化單體應(yīng)用(Simplifying monolithic applications) 許多應(yīng)用更新數(shù)據(jù)庫(kù),然后在數(shù)據(jù)庫(kù)中的更改被提交后,做一些額外的工作:更新搜索索引,更新緩存,發(fā)送通知,運(yùn)行業(yè)務(wù)邏輯,等等。 這種情況通常稱(chēng)為雙寫(xiě)(dual-writes),因?yàn)閼?yīng)用沒(méi)有在一個(gè)事務(wù)內(nèi)寫(xiě)多個(gè)系統(tǒng)。這樣不僅應(yīng)用邏輯復(fù)雜難以維護(hù), 而且雙寫(xiě)容易丟失數(shù)據(jù)或者在一些系統(tǒng)更新成功而另一些系統(tǒng)沒(méi)有更新成功的時(shí)候造成不同系統(tǒng)之間的狀態(tài)不一致。使用捕獲更改數(shù)據(jù)技術(shù)(change data capture,CDC), 在源數(shù)據(jù)庫(kù)的數(shù)據(jù)更改提交后,這些額外的工作可以被放在獨(dú)立的線程或者進(jìn)程(服務(wù))中完成。這種實(shí)現(xiàn)方式的容錯(cuò)性更好,不會(huì)丟失事件,容易擴(kuò)展,并且更容易支持升級(jí)。

共享數(shù)據(jù)庫(kù)(Sharing databases) 當(dāng)多個(gè)應(yīng)用共用同一個(gè)數(shù)據(jù)庫(kù)的時(shí)候,一個(gè)應(yīng)用提交的更改通常要被另一個(gè)應(yīng)用感知到。一種實(shí)現(xiàn)方式是使用消息總線, 盡管非事務(wù)性(non-transactional)的消息總線總會(huì)受上面提到的雙寫(xiě)(dual-writes)影響。但是,另一種實(shí)現(xiàn)方式,即Debezium,變得很直接:每個(gè)應(yīng)用可以直接監(jiān)控?cái)?shù)據(jù)庫(kù)的更改,并且響應(yīng)更改。

數(shù)據(jù)集成(Data integration) 數(shù)據(jù)通常被存儲(chǔ)在多個(gè)地方,尤其是當(dāng)數(shù)據(jù)被用于不同的目的的時(shí)候,會(huì)有不同的形式。保持多系統(tǒng)的同步是很有挑戰(zhàn)性的, 但是可以通過(guò)使用Debezium加上簡(jiǎn)單的事件處理邏輯來(lái)實(shí)現(xiàn)簡(jiǎn)單的ETL類(lèi)型的解決方案。

命令查詢(xún)職責(zé)分離(CQRS) 在命令查詢(xún)職責(zé)分離 Command Query Responsibility Separation (CQRS) 架構(gòu)模式中,更新數(shù)據(jù)使用了一種數(shù)據(jù)模型, 讀數(shù)據(jù)使用了一種或者多種數(shù)據(jù)模型。由于數(shù)據(jù)更改被記錄在更新側(cè)(update-side),這些更改將被處理以更新各種讀展示。 所以CQRS應(yīng)用通常更復(fù)雜,尤其是他們需要保證可靠性和全序(totally-ordered)處理。Debezium和CDC可以使這種方式更可行: 寫(xiě)操作被正常記錄,但是Debezium捕獲數(shù)據(jù)更改,并且持久化到全序流里,然后供那些需要異步更新只讀視圖的服務(wù)消費(fèi)。 寫(xiě)側(cè)(write-side)表可以表示面向領(lǐng)域的實(shí)體(domain-oriented entities),或者當(dāng)CQRS和 Event Sourcing 結(jié)合的時(shí)候,寫(xiě)側(cè)表僅僅用做追加操作命令事件的日志。

springboot 整合 Debezium

依賴(lài)

1.7.0.Final
8.0.26


mysql
mysql-connector-java
${mysql.connector.version}
runtime


io.debezium
debezium-api
${debezium.version}


io.debezium
debezium-embedded
${debezium.version}


io.debezium
debezium-connector-mysql
${debezium.version}


mysql
mysql-connector-java



注意debezium版本為1.7.0.Final,對(duì)應(yīng)mysql驅(qū)動(dòng)為8.0.26,低于這個(gè)版本會(huì)報(bào)兼容錯(cuò)誤。

配置

相應(yīng)的配置

debezium.datasource.hostname=localhost
debezium.datasource.port=3306
debezium.datasource.user=root
debezium.datasource.password=123456
debezium.datasource.tableWhitelist=test.test
debezium.datasource.storageFile=E:/debezium/test/offsets/offset.dat
debezium.datasource.historyFile=E:/debezium/test/history/custom-file-db-history.dat
debezium.datasource.flushInterval=10000
debezium.datasource.serverId=1
debezium.datasource.serverName=name-1

然后進(jìn)行配置初始化。

主要的配置項(xiàng):

connector.class

監(jiān)控的數(shù)據(jù)庫(kù)類(lèi)型,這里選mysql。

offset.storage

選擇FileOffsetBackingStore時(shí),意思把讀取進(jìn)度存到本地文件,因?yàn)槲覀儾挥胟afka,當(dāng)使用kafka時(shí),選KafkaOffsetBackingStore 。

offset.storage.file.filename

存放讀取進(jìn)度的本地文件地址。

offset.flush.interval.ms

讀取進(jìn)度刷新保存頻率,默認(rèn)1分鐘。如果不依賴(lài)kafka的話(huà),應(yīng)該就沒(méi)有exactly once只讀取一次語(yǔ)義,應(yīng)該是至少讀取一次。意味著可能重復(fù)讀取。如果web容器掛了,最新的讀取進(jìn)度沒(méi)有刷新到文件里,下次重啟時(shí),就會(huì)重復(fù)讀取binlog。

table.whitelist

監(jiān)控的表名白名單,建議設(shè)置此值,只監(jiān)控這些表的binlog。

database.whitelist

監(jiān)控的數(shù)據(jù)庫(kù)白名單,如果選此值,會(huì)忽略table.whitelist,然后監(jiān)控此db下所有表的binlog。

/**
*@className:MysqlConfig
*@author:nyp
*@description:TODO
*@date:2023/8/713:53
*@version:1.0
*/
@Configuration
@ConfigurationProperties(prefix="debezium.datasource")
@Data
publicclassMysqlBinlogConfig{

privateStringhostname;
privateStringport;
privateStringuser;
privateStringpassword;
privateStringtableWhitelist;
privateStringstorageFile;
privateStringhistoryFile;
privateLongflushInterval;
privateStringserverId;
privateStringserverName;

@Bean
publicio.debezium.config.ConfigurationMysqlBinlogConfig()throwsException{
checkFile();
io.debezium.config.Configurationconfiguration=io.debezium.config.Configuration.create()
.with("name","mysql_connector")
.with("connector.class",MySqlConnector.class)
//.with("offset.storage",KafkaOffsetBackingStore.class)
.with("offset.storage",FileOffsetBackingStore.class)
.with("offset.storage.file.filename",storageFile)
.with("offset.flush.interval.ms",flushInterval)
.with("database.history",FileDatabaseHistory.class.getName())
.with("database.history.file.filename",historyFile)
.with("snapshot.mode","Schema_only")
.with("database.server.id",serverId)
.with("database.server.name",serverName)
.with("database.hostname",hostname)
//.with("database.dbname",dbname)
.with("database.port",port)
.with("database.user",user)
.with("database.password",password)
//.with("database.whitelist","test")
.with("table.whitelist",tableWhitelist)
.build();
returnconfiguration;

}

privatevoidcheckFile()throwsIOException{
Stringdir=storageFile.substring(0,storageFile.lastIndexOf("/"));
FiledirFile=newFile(dir);
if(!dirFile.exists()){
dirFile.mkdirs();
}
Filefile=newFile(storageFile);
if(!file.exists()){
file.createNewFile();
}
}
}

snapshot.mode 快照模式,指定連接器啟動(dòng)時(shí)運(yùn)行快照的條件??赡艿脑O(shè)置有:

initial 只有在沒(méi)有為邏輯服務(wù)器名記錄偏移量時(shí),連接器才運(yùn)行快照。

When_needed 當(dāng)連接器認(rèn)為有必要時(shí),它會(huì)在啟動(dòng)時(shí)運(yùn)行快照。也就是說(shuō),當(dāng)沒(méi)有可用的偏移量時(shí),或者當(dāng)先前記錄的偏移量指定了服務(wù)器中不可用的binlog位置或GTID時(shí)。

Never 連接器從不使用快照。在第一次使用邏輯服務(wù)器名啟動(dòng)時(shí),連接器從binlog的開(kāi)頭讀取。謹(jǐn)慎配置此行為。只有當(dāng)binlog保證包含數(shù)據(jù)庫(kù)的整個(gè)歷史記錄時(shí),它才有效。

Schema_only 連接器運(yùn)行模式而不是數(shù)據(jù)的快照。當(dāng)您不需要主題包含數(shù)據(jù)的一致快照,而只需要主題包含自連接器啟動(dòng)以來(lái)的更改時(shí),此設(shè)置非常有用。

Schema_only_recovery 這是已經(jīng)捕獲更改的連接器的恢復(fù)設(shè)置。當(dāng)您重新啟動(dòng)連接器時(shí),此設(shè)置允許恢復(fù)損壞或丟失的數(shù)據(jù)庫(kù)歷史主題。您可以定期將其設(shè)置為“清理”意外增長(zhǎng)的數(shù)據(jù)庫(kù)歷史主題。數(shù)據(jù)庫(kù)歷史主題需要無(wú)限保留。

database.server.id

偽裝成slave的Debezium服務(wù)的id,自定義,有多個(gè)Debezium服務(wù)不能重復(fù),如果重復(fù)的話(huà)會(huì)報(bào)以下異常。

io.debezium.DebeziumException:Aslavewiththesameserver_uuid/server_idasthisslavehasconnectedtothemaster;thefirstevent'binlog.000013'at46647257,thelasteventreadfrom'./binlog.000013'at125,thelastbytereadfrom'./binlog.000013'at46647257.Errorcode:1236;SQLSTATE:HY000.
atio.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1167)
atio.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1212)
atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
atcom.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
atcom.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
atjava.lang.Thread.run(Thread.java:750)
Causedby:com.github.shyiko.mysql.binlog.network.ServerException:Aslavewiththesameserver_uuid/server_idasthisslavehasconnectedtothemaster;thefirstevent'binlog.000013'at46647257,thelasteventreadfrom'./binlog.000013'at125,thelastbytereadfrom'./binlog.000013'at46647257.
atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944)
...3commonframesomitted

監(jiān)聽(tīng)

配置監(jiān)聽(tīng)服務(wù)

/**
*@projectName:test
*@package:com.test.config
*@className:MysqlBinlogListener
*@author:nyp
*@description:TODO
*@date:2023/8/713:56
*@version:1.0
*/
@Component
@Slf4j
publicclassMysqlBinlogListener{

@Resource
privateExecutortaskExecutor;

privatefinalList>>engineList=newArrayList<>();

privateMysqlBinlogListener(@Qualifier("mysqlConnector")Configurationconfiguration){
this.engineList.add(DebeziumEngine.create(Json.class)
.using(configuration.asProperties())
.notifying(record->receiveChangeEvent(record.value()))
.build());
}

privatevoidreceiveChangeEvent(Stringvalue){
if(Objects.nonNull(value)){
Mappayload=getPayload(value);
Stringop=JSON.parseObject(JSON.toJSONString(payload.get("op")),String.class);
if(!(StringUtils.isBlank(op)||Envelope.Operation.READ.equals(op))){
ChangeDatachangeData=getChangeData(payload);
log.info("changeData="+changeData);
}
}
}

@PostConstruct
privatevoidstart(){
for(DebeziumEngine>engine:engineList){
taskExecutor.execute(engine);
}
}

@PreDestroy
privatevoidstop(){
for(DebeziumEngine>engine:engineList){
if(engine!=null){
try{
engine.close();
}catch(IOExceptione){
log.error("",e);
}
}
}
}


publicstaticMapgetPayload(Stringvalue){
Mapmap=JSON.parseObject(value,Map.class);
Mappayload=JSON.parseObject(JSON.toJSONString(map.get("payload")),Map.class);
returnpayload;
}

publicstaticChangeDatagetChangeData(Mappayload){
Mapsource=JSON.parseObject(JSON.toJSONString(payload.get("source")),Map.class);
returnChangeData.builder()
.op(payload.get("op").toString())
.table(source.get("table").toString())
.after(JSON.parseObject(JSON.toJSONString(payload.get("after")),Map.class))
.source(JSON.parseObject(JSON.toJSONString(payload.get("source")),Map.class))
.before(JSON.parseObject(JSON.toJSONString(payload.get("before")),Map.class))
.build();
}

@Data
@Builder
publicstaticclassChangeData{
/**
*更改前數(shù)據(jù)
*/
privateMapafter;
privateMapsource;
/**
*更改后數(shù)據(jù)
*/
privateMapbefore;
/**
*更改的表名
*/
privateStringtable;
/**
*操作類(lèi)型,枚舉Envelope.Operation
*/
privateStringop;
}

}

將監(jiān)聽(tīng)到的binlog日志封裝為ChangeData對(duì)象,包括表名,更改前后的數(shù)據(jù),

以及操作類(lèi)型

READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d"),
TRUNCATE("t");

測(cè)試

update操作輸出

MysqlListener.ChangeData(after={
name=SuzukiMio2,
id=1
},source={
file=binlog.000013,
connector=mysql,
pos=42587833,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691458956000,
snapshot=false,
db=test
table=test
},before={
name=SuzukiMio,
id=1
},table=test,op=u)
data={
name=SuzukiMio2,
id=1
}

新增操作輸出

MysqlListener.ChangeData(after={
name=王五,
id=0
},source={
file=binlog.000013,
connector=mysql,
pos=42588175,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691459066000,
snapshot=false,
db=test,
table=test
},before=null,table=test,op=c)

刪除操作輸出

MysqlListener.ChangeData(after=null,source={
file=binlog.000013,
connector=mysql,
pos=42588959,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691459104000,
snapshot=false,
db=test
table=test
},before={
name=王五,
id=0
},table=test,op=d)

我們之前配置的保存讀取進(jìn)度的文件storageFile,類(lèi)似于kafka的偏移量,記錄的內(nèi)容如下:

c92ad81e-3e3c-11ee-ac96-dac502259ad0.jpg

停止服務(wù),對(duì)數(shù)據(jù)庫(kù)進(jìn)行操作,再次重啟,會(huì)根據(jù)進(jìn)度重新讀取。

小結(jié)

本文介紹了debezium,更多的時(shí)候,我們一談到CDC,第一想到的是大量數(shù)據(jù)同步的工具。 但其實(shí)也可以利用其數(shù)據(jù)變更捕獲的特性,來(lái)達(dá)到一部份消息隊(duì)列的作用。 但其畢竟不能完全替代消息隊(duì)列。大家理性看待與選擇。

本文的重點(diǎn)在介紹一種思路,具體的某項(xiàng)技術(shù)反而不那么重要。






審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴

原文標(biāo)題:不想引入MQ?試試debezium

文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評(píng)論

    相關(guān)推薦
    熱點(diǎn)推薦

    使用片DAC61416芯片,如輸出50channel,這么多通道還能同時(shí)輸出嗎?

    如果使用片DAC61416芯片,如輸出50channel,這么多通道還能同時(shí)輸出嗎?會(huì)不會(huì)存在輸出時(shí)間上的偏差?
    發(fā)表于 11-29 10:46

    看了這么多論壇 還是這個(gè)論壇好啊...

    其他的論壇沒(méi)有這么多人,沒(méi)有這么快的更新.... 你們覺(jué)得呢?
    發(fā)表于 04-17 10:52

    這里的那些是程序,要弄成word 文檔的,新手沒(méi)做過(guò)這么多程序

    這里的那些是程序,要弄成word 文檔的,新手沒(méi)做過(guò)這么多程序
    發(fā)表于 04-15 23:29

    為什么roll一上電就飄了這么多?

    放在水平位置上校準(zhǔn)之后pitch還是挺準(zhǔn)的,roll一上電就飄了這么多,為什么??
    發(fā)表于 07-04 04:35

    為什么OLED初始化的時(shí)候要這么多命令?

    void OLED_Init(void)這個(gè)函數(shù)里面要寫(xiě)的命令好多啊,不知道為什么初始化的時(shí)候要這么多命令?。??求解具體在數(shù)據(jù)手冊(cè)哪幾頁(yè)
    發(fā)表于 09-18 23:58

    什么是VBA?為什么這么多軟件支持VBA?

    什么是VBA?什么是VBS?二者有什么不同?為什么這么多軟件支持VBA?
    發(fā)表于 07-02 06:35

    怎么記住這么多代碼格式?

    我記得剛開(kāi)始接觸編程的時(shí)候,覺(jué)得太難了。也很好奇,寫(xiě)代碼的那些人也太厲害了吧?全是英文的,他們的英文水平一定很好吧?他們是怎么記住這么多代碼格式的?而且錯(cuò)了一個(gè)標(biāo)點(diǎn)符號(hào),整個(gè)程序都會(huì)有影響。一個(gè)程序
    發(fā)表于 07-15 08:56

    電流密度和電荷密度兩個(gè)的表達(dá)式怎么差這么多

    電流密度是什么?電荷密度是什么?電流密度和電荷密度兩個(gè)的表達(dá)式怎么差這么多?
    發(fā)表于 09-28 09:36

    為什么要搞這么多架構(gòu)

    問(wèn)題:為什么要搞這么多架構(gòu)?webrtc雖然是一項(xiàng)主要使用p2p的實(shí)時(shí)通訊技術(shù),本應(yīng)該是無(wú)中心化節(jié)點(diǎn)的,但是在一些大型多人通訊場(chǎng)景,如果都使用端對(duì)端直連,端上會(huì)遇到很帶寬和性能的問(wèn)題,所以就有了下圖
    發(fā)表于 10-29 06:05

    STM32系統(tǒng)為什么要有時(shí)鐘?為什么有這么多個(gè)時(shí)鐘源

    STM32系統(tǒng)為什么要有時(shí)鐘?為什么有這么多個(gè)時(shí)鐘源?STM32系統(tǒng)時(shí)鐘的框架是由哪些部分組成的?
    發(fā)表于 11-22 07:00

    為什么有這么多編程語(yǔ)言呢

    關(guān)注+星標(biāo)公眾號(hào),不錯(cuò)過(guò)精彩內(nèi)容編排|strongerHuang微信公眾號(hào) |嵌入式專(zhuān)欄有很多初學(xué)者都會(huì)問(wèn):我到底是該學(xué)C語(yǔ)言,還是學(xué)C++,或者JAVA呢?為什么有這么多編程語(yǔ)言呢...
    發(fā)表于 01-12 06:34

    安卓8.0最新消息:安卓8.0初體驗(yàn),竟然這么流暢還有這么多黑科技功能

    安卓8.0初體驗(yàn),竟然這么流暢還有這么多黑科技功能
    發(fā)表于 04-13 09:00 ?3689次閱讀

    AC-DC電源適配器還有這么多門(mén)道?看完才知道

    AC-DC電源適配器還有這么多門(mén)道?看完才知道
    的頭像 發(fā)表于 07-02 11:40 ?6618次閱讀

    硬件電路設(shè)計(jì)有這么多坑,如何少走彎路?看大牛怎么說(shuō)

    硬件電路設(shè)計(jì)有這么多坑,如何少走彎路?看大牛怎么說(shuō)
    的頭像 發(fā)表于 11-27 17:34 ?877次閱讀

    這么多內(nèi)網(wǎng)穿透工具怎么?一篇讓你不再糾結(jié)的終極指南!

    穿透工具就是你的救星! 但問(wèn)題來(lái)了—— 市面上這么多工具,Ngrok、FRP、ZeroNews……到底該哪個(gè)? 別急,這篇文章將帶你全面了解各種內(nèi)網(wǎng)穿透方案的優(yōu)缺點(diǎn),幫你找到最適合自己的那一款! 一、內(nèi)網(wǎng)穿透:你的"網(wǎng)絡(luò)任意門(mén)" ? ? ? ? ? ? ? ?
    的頭像 發(fā)表于 05-13 16:06 ?292次閱讀
    <b class='flag-5'>這么多</b>內(nèi)網(wǎng)穿透工具怎么<b class='flag-5'>選</b>?一篇讓你不再糾結(jié)的終極指南!