介紹
kafka是一個(gè)比較流行的分布式、可拓展、高性能、可靠的流處理平臺(tái)。在處理kafka的數(shù)據(jù)時(shí),這里有確保處理效率和可靠性的多種最佳實(shí)踐。本文將介紹這幾種實(shí)踐方式,并通過sarama實(shí)現(xiàn)他們。
以下是一些kafka消費(fèi)的最佳實(shí)踐:
選擇合適的提交策略:Kafka提供兩種提交策略,自動(dòng)和手動(dòng)。雖然自動(dòng)操作很容易使用,但它可能會(huì)導(dǎo)致數(shù)據(jù)丟失或重復(fù)。手動(dòng)提交提供了更高級(jí)別的控制,確保消息至少處理一次或恰好一次,具體取決于用例。
盡可能減少Kafka的傳輸次數(shù):大批量讀取消息可以顯著提高吞吐量。這可以通過調(diào)整 fetch.min.bytes 和 fetch.max.wait.ms 等參數(shù)來實(shí)現(xiàn)。
盡可能使用消費(fèi)者組:Kafka允許多個(gè)消費(fèi)者組成一個(gè)消費(fèi)者組來并行消費(fèi)數(shù)據(jù)。這使得 Kafka 能夠?qū)?shù)據(jù)分發(fā)給一個(gè)組中的所有消費(fèi)者,從而實(shí)現(xiàn)高效的數(shù)據(jù)消費(fèi)。
調(diào)整消費(fèi)者緩沖區(qū)大小:通過調(diào)整消費(fèi)者的緩沖區(qū)大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根據(jù)消息的預(yù)期大小和消費(fèi)者的內(nèi)存容量進(jìn)行調(diào)整。這可以提高消費(fèi)者的表現(xiàn)。
處理rebalance:當(dāng)新的消費(fèi)者加入消費(fèi)者組,或者現(xiàn)有的消費(fèi)者離開時(shí),Kafka會(huì)觸發(fā)rebalance以重新分配負(fù)載。在此過程中,消費(fèi)者停止消費(fèi)數(shù)據(jù)。因此,快速有效地處理重新平衡可以提高整體吞吐量。
監(jiān)控消費(fèi)者:使用 Kafka 的消費(fèi)者指標(biāo)來監(jiān)控消費(fèi)者的性能。定期監(jiān)控可以幫助我們識(shí)別性能瓶頸并調(diào)整消費(fèi)者的配置。
選擇合適的提交策略
1.自動(dòng)提交
Sarama 的 ConsumerGroup 默認(rèn)情況下會(huì)自動(dòng)提交偏移量。這意味著它會(huì)定期提交已成功消費(fèi)的消息的偏移量,這允許消費(fèi)者在重新啟動(dòng)或消費(fèi)失敗時(shí)從中斷的地方繼續(xù)。
下面是一個(gè)自動(dòng)提交的消費(fèi)者組消費(fèi)消息的例子:
config := sarama.NewConfig() config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Panicf( "創(chuàng)建消費(fèi)者組客戶端時(shí)出錯(cuò): %v" , err) } Consumer := Consumer{} ctx := context.Background() for { err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer) if err != nil { log.Panicf( "來自消費(fèi)者的錯(cuò)誤: %v" , err) } }
根據(jù)config.Consumer.Offsets.AutoCommit.Interval可以看到,消費(fèi)者會(huì)每秒自動(dòng)提交offset。
2. 手動(dòng)提交
手動(dòng)提交使我們更好地控制何時(shí)提交消息偏移量。下面是一個(gè)手動(dòng)提交的消費(fèi)者組消費(fèi)消息的例子:
config := sarama.NewConfig() config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.AutoCommit.Enable = false consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config) if err != nil { log.Panicf( "創(chuàng)建消費(fèi)者組客戶端時(shí)出錯(cuò): %v" , err) } Consumer := Consumer{} ctx := context.Background() for { err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer) if err != nil { log.Panicf( "Error from Consumer: %v" , err) } } type Consumer struct {} func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil } func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { for msg : = range Claim.Messages() { fmt.Printf( "Message topic:%q partition:%d offset:%d " , msg.Topic, msg.Partition, msg.Offset) sess.MarkMessage(msg, "" ) } return nil }
在該示例中, 使用MarkMessage手動(dòng)將消息標(biāo)記為已處理,最終根據(jù)Consumer.Offsets.CommitInterval配置提交。另外這個(gè)例子省略了錯(cuò)誤處理部分,開發(fā)時(shí)需要注意正確處理生產(chǎn)過程中出現(xiàn)的錯(cuò)誤。
譯者注:這篇文章雖然是今年5月發(fā)布,但是這里的提交方式還是有些過時(shí)了,目前sarama已經(jīng)廢棄了Consumer.Offsets.CommitInterval,相關(guān)配置目前在Consumer.Offsets.AutoCommit
盡可能減少Kafka的傳輸次數(shù)
減少kafka的傳輸次數(shù)可以通過優(yōu)化從kafka中讀取和寫入數(shù)據(jù)的方式來實(shí)現(xiàn):
1. 增加批次的大小
使用kafka批量發(fā)送消息的效果優(yōu)于逐個(gè)發(fā)送消息,批次越大,kafka發(fā)送數(shù)據(jù)效率就越高。但是需要權(quán)衡延遲和吞吐量之間的關(guān)系。較大的批次雖然代表著更大的吞吐量,但也會(huì)增加延遲。因?yàn)榕卧酱?,填充批次的時(shí)間也越久。
在Go中,我們可以在使用sarama包生成消息時(shí)設(shè)置批次大?。?/p>
config := sarama.NewConfig() config.Producer.Flush.Bytes = 1024 * 1024
以及獲取消息的批次大小
config := sarama.NewConfig() config.Consumer.Fetch.Default = 1024 * 1024
2. 使用長(zhǎng)輪詢
長(zhǎng)輪詢是指消費(fèi)者輪詢時(shí)如果Kafka中沒有數(shù)據(jù),則消費(fèi)者將等待數(shù)據(jù)到達(dá)。這減少了往返次數(shù),因?yàn)橄M(fèi)者不需要在沒有數(shù)據(jù)時(shí)不斷請(qǐng)求數(shù)據(jù)。
config := sarama.NewConfig() config .Consumer.MaxWaitTime = 500 *time.Millisecond
該配置告訴消費(fèi)者在返回之前會(huì)等待500毫秒
3. 盡可能使用消費(fèi)者組
消費(fèi)者組是一組協(xié)同工作消費(fèi)來自kafka主題的消息的消費(fèi)者。消費(fèi)者組允許我們?cè)诙鄠€(gè)消費(fèi)者之間分配消息,從而提供橫向拓展能力。使用消費(fèi)者組時(shí),kafka負(fù)責(zé)將分區(qū)分配給組中的消費(fèi)者,并確保每個(gè)分區(qū)同時(shí)僅被一個(gè)消費(fèi)者消費(fèi)。
接下來是sarama中消費(fèi)者組的使用:
使用消費(fèi)者組需要實(shí)現(xiàn)一個(gè)ConsumerGroupHandler接口:
該接口具有三個(gè)方法:Setup、Cleanup、 和ConsumeClaim
type exampleConsumerGroupHandler struct { } func (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { for message := range Claim.Messages() { fmt.Printf( "Message: %s " , string (message.Value)) session.MarkMessage(message, "" ) } 返回 nil }
創(chuàng)建sarama.ConsumerGroup并開始消費(fèi):
brokers := []string{"localhost:9092"} topic := "example_topic" groupID := "example_consumer_group" consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Fatalf("Error creating consumer group: %v", err) } defer consumerGroup.Close() handler := &exampleConsumerGroupHandler{} for { err := consumerGroup.Consume(context.Background(), []string{topic}, handler) if err != nil { log.Printf("Error consuming messages: %v", err) } }
該示例設(shè)置了一個(gè)消費(fèi)組,用于消費(fèi)來自“example_topic”的消息。消費(fèi)者組可以通過添加更多消費(fèi)者來提高處理能力。
使用消費(fèi)者組時(shí),記得處理消費(fèi)期間rebalance和錯(cuò)誤。
調(diào)整消費(fèi)者緩沖區(qū)大小
在sarama中,我們可以調(diào)整消費(fèi)者緩沖區(qū)的大小,以調(diào)整消費(fèi)者在處理消息之前可以在內(nèi)存中保存的消息數(shù)量。
默認(rèn)情況下,緩沖區(qū)大小設(shè)置為256,這代表Sarama在開始處理消息之前將在內(nèi)存中保存最多256條消息。如果消費(fèi)者速度很慢,增加緩沖區(qū)大小可能有助于提高吞吐量。但是,更大的緩沖區(qū)也會(huì)消耗更多的內(nèi)存。
以下是如何增加緩沖區(qū)大小的例子:
config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Version = sarama.V2_1_0_0 config.Consumer.Offsets.Initial = sarama.OffsetOldest config.ChannelBufferSize = 500 group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config) if err != nil { panic(err) } ctx := context.Background() for { topics := []string{topic} handler := exampleConsumerGroupHandler{} err := group.Consume(ctx, topics, &handler) if err != nil { panic(err) } }
處理rebalance
當(dāng)新消費(fèi)者添加到消費(fèi)者組或現(xiàn)有消費(fèi)者離開消費(fèi)者組時(shí),kafka會(huì)重新平衡該組中的消費(fèi)者。rebalance是kafka確保消費(fèi)者組中的所有消費(fèi)者不會(huì)消費(fèi)同一分區(qū)的保證。
在sarama中,處理rebalance是通過 Setup 和CleanUp函數(shù)來完成的。
通過正確處理重新平衡事件,您可以確保應(yīng)用程序正常處理消費(fèi)者組的更改,例如消費(fèi)者離開或加入,并且在這些事件期間不會(huì)丟失或處理兩次消息。
譯者注:其實(shí)更重要的是在ConsumeClaim函數(shù)在通道關(guān)閉時(shí)盡早退出,才能正確的進(jìn)入CleanUp函數(shù)。
監(jiān)控消費(fèi)者
監(jiān)控Kafka消費(fèi)者對(duì)于確保系統(tǒng)的健康和性能至關(guān)重要,我們需要時(shí)刻關(guān)注延遲、處理時(shí)間和錯(cuò)誤率的指標(biāo)。
Golang沒有內(nèi)置對(duì) Kafka 監(jiān)控的支持,但有幾個(gè)庫和工具可以幫助我們。讓我們看一下其中的一些:
Sarama的Metrics:Sarama 提供了一個(gè)指標(biāo)注冊(cè)表,它報(bào)告了有助于監(jiān)控的各種指標(biāo),例如請(qǐng)求、響應(yīng)的數(shù)量、請(qǐng)求和響應(yīng)的大小等。這些指標(biāo)可以使用 Prometheus 等監(jiān)控系統(tǒng)來收集和監(jiān)控。
JMX Exporter:如果您在 JVM 上運(yùn)行 Kafka, 則可以使用 JMX Exporter 將kafka的 MBeans 發(fā)送給Prometheus
Kafka Exporter:Kafka Exporter是一個(gè)第三方工具,可以提供有關(guān)Kafka的更詳細(xì)的指標(biāo)。它可以提供消費(fèi)者組延遲,這是消費(fèi)kafka消息時(shí)要監(jiān)控的關(guān)鍵指標(biāo)。
Jaeger 或 OpenTelemetry:這些工具可用于分布式追蹤,這有助于追蹤消息如何流經(jīng)系統(tǒng)以及可能出現(xiàn)瓶頸的位置。
日志:時(shí)刻關(guān)注應(yīng)用程序日志,記錄消費(fèi)者中的任何錯(cuò)誤或異常行為。這些日志可以幫助我們?cè)\斷問題。
消費(fèi)者組命令, 可以使用kafka-consumer-groups命令來描述消費(fèi)者組的狀態(tài)。
請(qǐng)記住,不僅要追蹤這些指標(biāo),還要針對(duì)任何需要關(guān)注的場(chǎng)景設(shè)置警報(bào)。通過這些方法,我們可以在問題還在初始階段時(shí)快速做出響應(yīng)。
以上工作有助于確保使用kafka的應(yīng)用程序健壯、可靠且高效。
審核編輯:湯梓紅
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7250瀏覽量
91506 -
參數(shù)
+關(guān)注
關(guān)注
11文章
1867瀏覽量
32950 -
kafka
+關(guān)注
關(guān)注
0文章
53瀏覽量
5381
原文標(biāo)題:golang中使用kafka的綜合指南
文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
Kafka集群環(huán)境的搭建
現(xiàn)代的服務(wù)端技術(shù)棧:Golang/Protobuf/gRPC詳解
Kafka的概念及Kafka的宕機(jī)

Kafka 的簡(jiǎn)介

物通博聯(lián)5G-kafka工業(yè)網(wǎng)關(guān)實(shí)現(xiàn)kafka協(xié)議對(duì)接到云平臺(tái)
Kafka架構(gòu)技術(shù):Kafka的架構(gòu)和客戶端API設(shè)計(jì)

評(píng)論