一区二区三区三上|欧美在线视频五区|国产午夜无码在线观看视频|亚洲国产裸体网站|无码成年人影视|亚洲AV亚洲AV|成人开心激情五月|欧美性爱内射视频|超碰人人干人人上|一区二区无码三区亚洲人区久久精品

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

golang中使用kafka的綜合指南

馬哥Linux運(yùn)維 ? 來源:稀土掘金技術(shù)社區(qū) ? 2023-11-30 11:18 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

介紹

kafka是一個(gè)比較流行的分布式、可拓展、高性能、可靠的流處理平臺(tái)。在處理kafka的數(shù)據(jù)時(shí),這里有確保處理效率和可靠性的多種最佳實(shí)踐。本文將介紹這幾種實(shí)踐方式,并通過sarama實(shí)現(xiàn)他們。

以下是一些kafka消費(fèi)的最佳實(shí)踐:

選擇合適的提交策略:Kafka提供兩種提交策略,自動(dòng)和手動(dòng)。雖然自動(dòng)操作很容易使用,但它可能會(huì)導(dǎo)致數(shù)據(jù)丟失或重復(fù)。手動(dòng)提交提供了更高級(jí)別的控制,確保消息至少處理一次或恰好一次,具體取決于用例。

盡可能減少Kafka的傳輸次數(shù):大批量讀取消息可以顯著提高吞吐量。這可以通過調(diào)整 fetch.min.bytes 和 fetch.max.wait.ms 等參數(shù)來實(shí)現(xiàn)。

盡可能使用消費(fèi)者組:Kafka允許多個(gè)消費(fèi)者組成一個(gè)消費(fèi)者組來并行消費(fèi)數(shù)據(jù)。這使得 Kafka 能夠?qū)?shù)據(jù)分發(fā)給一個(gè)組中的所有消費(fèi)者,從而實(shí)現(xiàn)高效的數(shù)據(jù)消費(fèi)。

調(diào)整消費(fèi)者緩沖區(qū)大小:通過調(diào)整消費(fèi)者的緩沖區(qū)大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根據(jù)消息的預(yù)期大小和消費(fèi)者的內(nèi)存容量進(jìn)行調(diào)整。這可以提高消費(fèi)者的表現(xiàn)。

處理rebalance:當(dāng)新的消費(fèi)者加入消費(fèi)者組,或者現(xiàn)有的消費(fèi)者離開時(shí),Kafka會(huì)觸發(fā)rebalance以重新分配負(fù)載。在此過程中,消費(fèi)者停止消費(fèi)數(shù)據(jù)。因此,快速有效地處理重新平衡可以提高整體吞吐量。

監(jiān)控消費(fèi)者:使用 Kafka 的消費(fèi)者指標(biāo)來監(jiān)控消費(fèi)者的性能。定期監(jiān)控可以幫助我們識(shí)別性能瓶頸并調(diào)整消費(fèi)者的配置。

選擇合適的提交策略

1.自動(dòng)提交

Sarama 的 ConsumerGroup 默認(rèn)情況下會(huì)自動(dòng)提交偏移量。這意味著它會(huì)定期提交已成功消費(fèi)的消息的偏移量,這允許消費(fèi)者在重新啟動(dòng)或消費(fèi)失敗時(shí)從中斷的地方繼續(xù)。

下面是一個(gè)自動(dòng)提交的消費(fèi)者組消費(fèi)消息的例子:


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = true  
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second  
  
ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Panicf( "創(chuàng)建消費(fèi)者組客戶端時(shí)出錯(cuò): %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "來自消費(fèi)者的錯(cuò)誤: %v" , err)  
    }  
}

根據(jù)config.Consumer.Offsets.AutoCommit.Interval可以看到,消費(fèi)者會(huì)每秒自動(dòng)提交offset。

2. 手動(dòng)提交

手動(dòng)提交使我們更好地控制何時(shí)提交消息偏移量。下面是一個(gè)手動(dòng)提交的消費(fèi)者組消費(fèi)消息的例子:


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = false 
  
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config)  
if err != nil {  
    log.Panicf( "創(chuàng)建消費(fèi)者組客戶端時(shí)出錯(cuò): %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "Error from Consumer: %v" , err)  
    }  
}  
  


type Consumer struct {}  
  
func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {  
    for msg : = range Claim.Messages() {  
        
        fmt.Printf( "Message topic:%q partition:%d offset:%d
" , msg.Topic, msg.Partition, msg.Offset)  


        
        sess.MarkMessage(msg, "" )  
    }  
    return nil  
}

在該示例中, 使用MarkMessage手動(dòng)將消息標(biāo)記為已處理,最終根據(jù)Consumer.Offsets.CommitInterval配置提交。另外這個(gè)例子省略了錯(cuò)誤處理部分,開發(fā)時(shí)需要注意正確處理生產(chǎn)過程中出現(xiàn)的錯(cuò)誤。

譯者注:這篇文章雖然是今年5月發(fā)布,但是這里的提交方式還是有些過時(shí)了,目前sarama已經(jīng)廢棄了Consumer.Offsets.CommitInterval,相關(guān)配置目前在Consumer.Offsets.AutoCommit

盡可能減少Kafka的傳輸次數(shù)

減少kafka的傳輸次數(shù)可以通過優(yōu)化從kafka中讀取和寫入數(shù)據(jù)的方式來實(shí)現(xiàn):

1. 增加批次的大小

使用kafka批量發(fā)送消息的效果優(yōu)于逐個(gè)發(fā)送消息,批次越大,kafka發(fā)送數(shù)據(jù)效率就越高。但是需要權(quán)衡延遲和吞吐量之間的關(guān)系。較大的批次雖然代表著更大的吞吐量,但也會(huì)增加延遲。因?yàn)榕卧酱?,填充批次的時(shí)間也越久。

在Go中,我們可以在使用sarama包生成消息時(shí)設(shè)置批次大?。?/p>


config := sarama.NewConfig()  
config.Producer.Flush.Bytes = 1024 * 1024

以及獲取消息的批次大小


config := sarama.NewConfig()  
config.Consumer.Fetch.Default = 1024 * 1024

2. 使用長(zhǎng)輪詢

長(zhǎng)輪詢是指消費(fèi)者輪詢時(shí)如果Kafka中沒有數(shù)據(jù),則消費(fèi)者將等待數(shù)據(jù)到達(dá)。這減少了往返次數(shù),因?yàn)橄M(fèi)者不需要在沒有數(shù)據(jù)時(shí)不斷請(qǐng)求數(shù)據(jù)。


config := sarama.NewConfig() 
config .Consumer.MaxWaitTime = 500 *time.Millisecond

該配置告訴消費(fèi)者在返回之前會(huì)等待500毫秒

3. 盡可能使用消費(fèi)者組

消費(fèi)者組是一組協(xié)同工作消費(fèi)來自kafka主題的消息的消費(fèi)者。消費(fèi)者組允許我們?cè)诙鄠€(gè)消費(fèi)者之間分配消息,從而提供橫向拓展能力。使用消費(fèi)者組時(shí),kafka負(fù)責(zé)將分區(qū)分配給組中的消費(fèi)者,并確保每個(gè)分區(qū)同時(shí)僅被一個(gè)消費(fèi)者消費(fèi)。

接下來是sarama中消費(fèi)者組的使用:

使用消費(fèi)者組需要實(shí)現(xiàn)一個(gè)ConsumerGroupHandler接口

該接口具有三個(gè)方法:Setup、Cleanup、 和ConsumeClaim


type exampleConsumerGroupHandler struct { 
} 


func  (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { 
    for message := range Claim.Messages() { 
        
        fmt.Printf( "Message: %s
" , string (message.Value)) 


        
        session.MarkMessage(message, "" ) 
    }
    返回 nil
 }

創(chuàng)建sarama.ConsumerGroup并開始消費(fèi):


brokers := []string{"localhost:9092"} 
topic := "example_topic"  
groupID := "example_consumer_group"  
  


consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Fatalf("Error creating consumer group: %v", err)  
}  
defer consumerGroup.Close()  
  


handler := &exampleConsumerGroupHandler{}  
  


for {  
    err := consumerGroup.Consume(context.Background(), []string{topic}, handler)  
    if err != nil {  
        log.Printf("Error consuming messages: %v", err)  
    }  
}

該示例設(shè)置了一個(gè)消費(fèi)組,用于消費(fèi)來自“example_topic”的消息。消費(fèi)者組可以通過添加更多消費(fèi)者來提高處理能力。

使用消費(fèi)者組時(shí),記得處理消費(fèi)期間rebalance和錯(cuò)誤。

調(diào)整消費(fèi)者緩沖區(qū)大小

在sarama中,我們可以調(diào)整消費(fèi)者緩沖區(qū)的大小,以調(diào)整消費(fèi)者在處理消息之前可以在內(nèi)存中保存的消息數(shù)量。

默認(rèn)情況下,緩沖區(qū)大小設(shè)置為256,這代表Sarama在開始處理消息之前將在內(nèi)存中保存最多256條消息。如果消費(fèi)者速度很慢,增加緩沖區(qū)大小可能有助于提高吞吐量。但是,更大的緩沖區(qū)也會(huì)消耗更多的內(nèi)存。

以下是如何增加緩沖區(qū)大小的例子:


config := sarama.NewConfig()  
config.Consumer.Return.Errors = true  
config.Version = sarama.V2_1_0_0  
config.Consumer.Offsets.Initial = sarama.OffsetOldest  


config.ChannelBufferSize = 500  
  


group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config)  
if err != nil {  
    panic(err)  
}  
  


ctx := context.Background()  
for {  
    topics := []string{topic}  
    handler := exampleConsumerGroupHandler{}  


    err := group.Consume(ctx, topics, &handler)  
    if err != nil {  
        panic(err)  
    }  
}

處理rebalance

當(dāng)新消費(fèi)者添加到消費(fèi)者組或現(xiàn)有消費(fèi)者離開消費(fèi)者組時(shí),kafka會(huì)重新平衡該組中的消費(fèi)者。rebalance是kafka確保消費(fèi)者組中的所有消費(fèi)者不會(huì)消費(fèi)同一分區(qū)的保證。

在sarama中,處理rebalance是通過 Setup 和CleanUp函數(shù)來完成的。

通過正確處理重新平衡事件,您可以確保應(yīng)用程序正常處理消費(fèi)者組的更改,例如消費(fèi)者離開或加入,并且在這些事件期間不會(huì)丟失或處理兩次消息。

譯者注:其實(shí)更重要的是在ConsumeClaim函數(shù)在通道關(guān)閉時(shí)盡早退出,才能正確的進(jìn)入CleanUp函數(shù)。

監(jiān)控消費(fèi)者

監(jiān)控Kafka消費(fèi)者對(duì)于確保系統(tǒng)的健康和性能至關(guān)重要,我們需要時(shí)刻關(guān)注延遲、處理時(shí)間和錯(cuò)誤率的指標(biāo)。

Golang沒有內(nèi)置對(duì) Kafka 監(jiān)控的支持,但有幾個(gè)庫和工具可以幫助我們。讓我們看一下其中的一些:

Sarama的Metrics:Sarama 提供了一個(gè)指標(biāo)注冊(cè)表,它報(bào)告了有助于監(jiān)控的各種指標(biāo),例如請(qǐng)求、響應(yīng)的數(shù)量、請(qǐng)求和響應(yīng)的大小等。這些指標(biāo)可以使用 Prometheus 等監(jiān)控系統(tǒng)來收集和監(jiān)控。

JMX Exporter:如果您在 JVM 上運(yùn)行 Kafka, 則可以使用 JMX Exporter 將kafka的 MBeans 發(fā)送給Prometheus

Kafka Exporter:Kafka Exporter是一個(gè)第三方工具,可以提供有關(guān)Kafka的更詳細(xì)的指標(biāo)。它可以提供消費(fèi)者組延遲,這是消費(fèi)kafka消息時(shí)要監(jiān)控的關(guān)鍵指標(biāo)。

Jaeger 或 OpenTelemetry:這些工具可用于分布式追蹤,這有助于追蹤消息如何流經(jīng)系統(tǒng)以及可能出現(xiàn)瓶頸的位置。

日志:時(shí)刻關(guān)注應(yīng)用程序日志,記錄消費(fèi)者中的任何錯(cuò)誤或異常行為。這些日志可以幫助我們?cè)\斷問題。

消費(fèi)者組命令, 可以使用kafka-consumer-groups命令來描述消費(fèi)者組的狀態(tài)。

請(qǐng)記住,不僅要追蹤這些指標(biāo),還要針對(duì)任何需要關(guān)注的場(chǎng)景設(shè)置警報(bào)。通過這些方法,我們可以在問題還在初始階段時(shí)快速做出響應(yīng)。

以上工作有助于確保使用kafka的應(yīng)用程序健壯、可靠且高效。

審核編輯:湯梓紅

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 數(shù)據(jù)
    +關(guān)注

    關(guān)注

    8

    文章

    7250

    瀏覽量

    91506
  • 參數(shù)
    +關(guān)注

    關(guān)注

    11

    文章

    1867

    瀏覽量

    32950
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    53

    瀏覽量

    5381

原文標(biāo)題:golang中使用kafka的綜合指南

文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評(píng)論

    相關(guān)推薦
    熱點(diǎn)推薦

    如何使用Golang連接MySQL

    首先我們來看如何使用Golang連接MySQL。
    的頭像 發(fā)表于 01-08 09:42 ?3860次閱讀
    如何使用<b class='flag-5'>Golang</b>連接MySQL

    Kafka讀取數(shù)據(jù)操作指南

    Kafka消費(fèi)者——從 Kafka讀取數(shù)據(jù)
    發(fā)表于 09-16 06:42

    淺析kafka

    kafka常見問題
    發(fā)表于 09-29 10:09

    基于發(fā)布與訂閱的消息系統(tǒng)Kafka

    Kafka權(quán)威指南》——初識(shí) Kafka
    發(fā)表于 03-05 13:46

    Kafka基礎(chǔ)入門文檔

    kafka系統(tǒng)入門教程(原理、配置、集群搭建、Java應(yīng)用、Kafka-manager)
    發(fā)表于 03-12 07:22

    Kafka集群環(huán)境的搭建

    1、環(huán)境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發(fā)表于 01-05 17:55

    現(xiàn)代的服務(wù)端技術(shù)棧:Golang/Protobuf/gRPC詳解

    Golang又稱Go語言,是一個(gè)開源的、多用途的編程語言,由Google研發(fā),并由于種種原因,正在日益流行。Golang已經(jīng)有10年的歷史,并且據(jù)Google稱已經(jīng)在生產(chǎn)環(huán)境中使用了接近7年的時(shí)間,這一點(diǎn)可能讓大多數(shù)人大跌眼鏡。
    的頭像 發(fā)表于 12-25 17:32 ?1376次閱讀

    Kafka的概念及Kafka的宕機(jī)

    問題要從一次Kafka的宕機(jī)開始說起。 筆者所在的是一家金融科技公司,但公司內(nèi)部并沒有采用在金融支付領(lǐng)域更為流行的 RabbitMQ ,而是采用了設(shè)計(jì)之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發(fā)表于 08-27 11:21 ?2445次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機(jī)

    初探Golang內(nèi)聯(lián)

    今天我們來聊聊 Golang 中的內(nèi)聯(lián)。
    的頭像 發(fā)表于 12-13 09:51 ?1194次閱讀

    GoLang的安裝和使用

    GoLang的安裝和使用
    的頭像 發(fā)表于 01-13 14:06 ?1489次閱讀
    <b class='flag-5'>GoLang</b>的安裝和使用

    Kafka 的簡(jiǎn)介

    ? 1 kafka簡(jiǎn)介 2 為什么要用消息系統(tǒng) 3 kafka基礎(chǔ)知識(shí) 4 kafka集群架構(gòu) 5 總結(jié) ? 1 kafka簡(jiǎn)介 其主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方式提供
    的頭像 發(fā)表于 07-03 11:10 ?883次閱讀
    <b class='flag-5'>Kafka</b> 的簡(jiǎn)介

    物通博聯(lián)5G-kafka工業(yè)網(wǎng)關(guān)實(shí)現(xiàn)kafka協(xié)議對(duì)接到云平臺(tái)

    Kafka協(xié)議是一種基于TCP層的網(wǎng)絡(luò)協(xié)議,用于在分布式消息傳遞系統(tǒng)Apache Kafka中發(fā)送和接收消息。Kafka協(xié)議定義了客戶端和服務(wù)器之間的通信方式和數(shù)據(jù)格式,允許客戶端發(fā)送消息到K
    的頭像 發(fā)表于 07-11 10:44 ?734次閱讀

    Kafka架構(gòu)技術(shù):Kafka的架構(gòu)和客戶端API設(shè)計(jì)

    Kafka 給自己的定位是事件流平臺(tái)(event stream platform)。因此在消息隊(duì)列中經(jīng)常使用的 "消息"一詞,在 Kafka 中被稱為 "事件"。
    的頭像 發(fā)表于 10-10 15:41 ?2723次閱讀
    <b class='flag-5'>Kafka</b>架構(gòu)技術(shù):<b class='flag-5'>Kafka</b>的架構(gòu)和客戶端API設(shè)計(jì)

    kafka相關(guān)命令詳解

    kafka常用命令詳解
    的頭像 發(fā)表于 10-20 11:34 ?1310次閱讀

    kafka基本原理詳解

    今天浩道跟大家分享一篇關(guān)于kafka相關(guān)原理的硬核干貨,可以說即使你沒有接觸過kafka,也可以秒懂,一起看看!
    的頭像 發(fā)表于 01-03 09:57 ?1138次閱讀
    <b class='flag-5'>kafka</b>基本原理詳解