一区二区三区三上|欧美在线视频五区|国产午夜无码在线观看视频|亚洲国产裸体网站|无码成年人影视|亚洲AV亚洲AV|成人开心激情五月|欧美性爱内射视频|超碰人人干人人上|一区二区无码三区亚洲人区久久精品

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

采用Go開發(fā)語(yǔ)言實(shí)現(xiàn)海量日志收集系統(tǒng)的開發(fā)

馬哥Linux運(yùn)維 ? 來(lái)源:IT大咖說(shuō)? ? 作者:IT大咖說(shuō)? ? 2021-07-05 14:18 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

再次整理了一下這個(gè)日志收集系統(tǒng)的框,如下圖

這次要實(shí)現(xiàn)的代碼的整體邏輯為:

完整代碼地址為: https://github.com/pythonsite/logagent

etcd介紹

高可用的分布式key-value存儲(chǔ),可以用于配置共享和服務(wù)發(fā)現(xiàn)

類似的項(xiàng)目:zookeeper和consul

開發(fā)語(yǔ)言:go

接口:提供restful的接口,使用簡(jiǎn)單

實(shí)現(xiàn)算法:基于raft算法的強(qiáng)一致性,高可用的服務(wù)存儲(chǔ)目錄

etcd的應(yīng)用場(chǎng)景:

服務(wù)發(fā)現(xiàn)和服務(wù)注冊(cè)

配置中心(我們實(shí)現(xiàn)的日志收集客戶端需要用到)

分布式鎖

master選舉

官網(wǎng)對(duì)etcd的有一個(gè)非常簡(jiǎn)明的介紹:

etcd搭建:

下載地址:https://github.com/coreos/etcd/releases/

根據(jù)自己的環(huán)境下載對(duì)應(yīng)的版本然后啟動(dòng)起來(lái)就可以了

啟動(dòng)之后可以通過(guò)如下命令驗(yàn)證一下:

[root@localhost etcd-v3.2.18-linux-amd64]# 。/etcdctl set name zhaofan

zhaofan

[root@localhost etcd-v3.2.18-linux-amd64]# 。/etcdctl get name

zhaofan

[root@localhost etcd-v3.2.18-linux-amd64]#

context 介紹和使用

其實(shí)這個(gè)東西翻譯過(guò)來(lái)就是上下文管理,那么context的作用是做什么,主要有如下兩個(gè)作用:

控制goroutine的超時(shí)

保存上下文數(shù)據(jù)

通過(guò)下面一個(gè)簡(jiǎn)單的例子進(jìn)行理解:

package main

import (

“fmt”

“time”

“net/http”

“context”

“io/ioutil”

type Result struct{

r *http.Response

err error

}

func process(){

ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)

defer cancel()

tr := &http.Transport{}

client := &http.Client{Transport:tr}

c := make(chan Result,1)

req,err := http.NewRequest(“GET”,“http://www.google.com”,nil)

if err != nil{

fmt.Println(“http request failed,err:”,err)

return

}

// 如果請(qǐng)求成功了會(huì)將數(shù)據(jù)存入到管道中

go func(){

resp,err := client.Do(req)

pack := Result{resp,err}

c 《- pack

}()

select{

case 《- ctx.Done():

tr.CancelRequest(req)

fmt.Println(“timeout!”)

case res := 《-c:

defer res.r.Body.Close()

out,_:= ioutil.ReadAll(res.r.Body)

fmt.Printf(“server response:%s”,out)

}

return

}

func main() {

process()

}

寫一個(gè)通過(guò)context保存上下文,代碼例子如:

package main

import (

“github.com/Go-zh/net/context”

“fmt”

func add(ctx context.Context,a,b int) int {

traceId := ctx.Value(“trace_id”)。(string)

fmt.Printf(“trace_id:%v

”,traceId)

return a+b

}

func calc(ctx context.Context,a, b int) int{

traceId := ctx.Value(“trace_id”)。(string)

fmt.Printf(“trace_id:%v

”,traceId)

//再將ctx傳入到add中

return add(ctx,a,b)

}

func main() {

//將ctx傳遞到calc中

ctx := context.WithValue(context.Background(),“trace_id”,“123456”)

calc(ctx,20,30)

}

結(jié)合etcd和context使用

關(guān)于通過(guò)go連接etcd的簡(jiǎn)單例子:(這里有個(gè)小問(wèn)題需要注意就是etcd的啟動(dòng)方式,默認(rèn)啟動(dòng)可能會(huì)連接不上,尤其你是在虛擬你安裝,所以需要通過(guò)如下命令啟動(dòng):

。/etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381

package main

import (

etcd_client “github.com/coreos/etcd/clientv3”

“time”

“fmt”

func main() {

cli, err := etcd_client.New(etcd_client.Config{

Endpoints:[]string{“192.168.0.118:2371”},

DialTimeout:5*time.Second,

})

if err != nil{

fmt.Println(“connect failed,err:”,err)

return

}

fmt.Println(“connect success”)

defer cli.Close()

}

下面一個(gè)例子是通過(guò)連接etcd,存值并取值

package main

import (

“github.com/coreos/etcd/clientv3”

“time”

“fmt”

“context”

func main() {

cli,err := clientv3.New(clientv3.Config{

Endpoints:[]string{“192.168.0.118:2371”},

DialTimeout:5*time.Second,

})

if err != nil{

fmt.Println(“connect failed,err:”,err)

return

}

fmt.Println(“connect succ”)

defer cli.Close()

ctx,cancel := context.WithTimeout(context.Background(),time.Second)

_,err = cli.Put(ctx,“l(fā)ogagent/conf/”,“sample_value”)

cancel()

if err != nil{

fmt.Println(“put failed,err”,err)

return

}

ctx, cancel = context.WithTimeout(context.Background(),time.Second)

resp,err := cli.Get(ctx,“l(fā)ogagent/conf/”)

cancel()

if err != nil{

fmt.Println(“get failed,err:”,err)

return

}

for _,ev := range resp.Kvs{

fmt.Printf(“%s:%s

”,ev.Key,ev.Value)

}

}

關(guān)于context官網(wǎng)也有一個(gè)例子非常有用,用于控制開啟的goroutine的退出,代碼如下:

package main

import (

“context”

“fmt”

func main() {

// gen generates integers in a separate goroutine and

// sends them to the returned channel.

// The callers of gen need to cancel the context once

// they are done consuming generated integers not to leak

// the internal goroutine started by gen.

gen := func(ctx context.Context) 《-chan int {

dst := make(chan int)

n := 1

go func() {

for {

select {

case 《-ctx.Done():

return // returning not to leak the goroutine

case dst 《- n:

n++

}

}

}()

return dst

}

ctx, cancel := context.WithCancel(context.Background())

defer cancel() // cancel when we are finished consuming integers

for n := range gen(ctx) {

fmt.Println(n)

if n == 5 {

break

}

}

}

關(guān)于官網(wǎng)文檔中的WithDeadline演示的代碼例子:

package main

import (

“context”

“fmt”

“time”

func main() {

d := time.Now().Add(50 * time.Millisecond)

ctx, cancel := context.WithDeadline(context.Background(), d)

// Even though ctx will be expired, it is good practice to call its

// cancelation function in any case. Failure to do so may keep the

// context and its parent alive longer than necessary.

defer cancel()

select {

case 《-time.After(1 * time.Second):

fmt.Println(“overslept”)

case 《-ctx.Done():

fmt.Println(ctx.Err())

}

}

通過(guò)上面的代碼有了一個(gè)基本的使用,那么如果我們通過(guò)etcd來(lái)做配置管理,如果配置更改之后,我們?nèi)绾瓮ㄖ獙?duì)應(yīng)的服務(wù)器配置更改,通過(guò)下面例子演示:

package main

import (

“github.com/coreos/etcd/clientv3”

“time”

“fmt”

“context”

func main() {

cli,err := clientv3.New(clientv3.Config{

Endpoints:[]string{“192.168.0.118:2371”},

DialTimeout:5*time.Second,

})

if err != nil {

fmt.Println(“connect failed,err:”,err)

return

}

defer cli.Close()

// 這里會(huì)阻塞

rch := cli.Watch(context.Background(),“l(fā)ogagent/conf/”)

for wresp := range rch{

for _,ev := range wresp.Events{

fmt.Printf(“%s %q : %q

”, ev.Type, ev.Kv.Key, ev.Kv.Value)

}

}

}

實(shí)現(xiàn)一個(gè)kafka的消費(fèi)者代碼的簡(jiǎn)單例子:

package main

import (

“github.com/Shopify/sarama”

“strings”

“fmt”

“time”

func main() {

consumer,err := sarama.NewConsumer(strings.Split(“192.168.0.118:9092”,“,”),nil)

if err != nil{

fmt.Println(“failed to start consumer:”,err)

return

}

partitionList,err := consumer.Partitions(“nginx_log”)

if err != nil {

fmt.Println(“Failed to get the list of partitions:”,err)

return

}

fmt.Println(partitionList)

for partition := range partitionList{

pc,err := consumer.ConsumePartition(“nginx_log”,int32(partition),sarama.OffsetNewest)

if err != nil {

fmt.Printf(“failed to start consumer for partition %d:%s

”,partition,err)

return

}

defer pc.AsyncClose()

go func(partitionConsumer sarama.PartitionConsumer){

for msg := range pc.Messages(){

fmt.Printf(“partition:%d Offset:%d Key:%s Value:%s”,msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))

}

}(pc)

}

time.Sleep(time.Hour)

consumer.Close()

}

但是上面的代碼并不是最佳代碼,因?yàn)槲覀冏詈笫峭ㄟ^(guò)time.sleep等待goroutine的執(zhí)行,我們可以更改為通過(guò)sync.WaitGroup方式實(shí)現(xiàn)

package main

import (

“github.com/Shopify/sarama”

“strings”

“fmt”

“sync”

var (

wg sync.WaitGroup

func main() {

consumer,err := sarama.NewConsumer(strings.Split(“192.168.0.118:9092”,“,”),nil)

if err != nil{

fmt.Println(“failed to start consumer:”,err)

return

}

partitionList,err := consumer.Partitions(“nginx_log”)

if err != nil {

fmt.Println(“Failed to get the list of partitions:”,err)

return

}

fmt.Println(partitionList)

for partition := range partitionList{

pc,err := consumer.ConsumePartition(“nginx_log”,int32(partition),sarama.OffsetNewest)

if err != nil {

fmt.Printf(“failed to start consumer for partition %d:%s

”,partition,err)

return

}

defer pc.AsyncClose()

go func(partitionConsumer sarama.PartitionConsumer){

wg.Add(1)

for msg := range partitionConsumer.Messages(){

fmt.Printf(“partition:%d Offset:%d Key:%s Value:%s”,msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))

}

wg.Done()

}(pc)

}

//time.Sleep(time.Hour)

wg.Wait()

consumer.Close()

}

將客戶端需要收集的日志信息放到etcd中

關(guān)于etcd處理的代碼為:

package main

import (

“github.com/coreos/etcd/clientv3”

“time”

“github.com/astaxie/beego/logs”

“context”

“fmt”

var Client *clientv3.Client

var logConfChan chan string

// 初始化etcd

func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

var keys []string

for _,ip := range ipArrays{

//keyfmt = /logagent/%s/log_config

keys = append(keys,fmt.Sprintf(keyfmt,ip))

}

logConfChan = make(chan string,10)

logs.Debug(“etcd watch key:%v timeout:%v”, keys, timeout)

Client,err = clientv3.New(clientv3.Config{

Endpoints:addr,

DialTimeout: timeout,

})

if err != nil{

logs.Error(“connect failed,err:%v”,err)

return

}

logs.Debug(“init etcd success”)

waitGroup.Add(1)

for _, key := range keys{

ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)

// 從etcd中獲取要收集日志的信息

resp,err := Client.Get(ctx,key)

cancel()

if err != nil {

logs.Warn(“get key %s failed,err:%v”,key,err)

continue

}

for _, ev := range resp.Kvs{

logs.Debug(“%q : %q

”, ev.Key, ev.Value)

logConfChan 《- string(ev.Value)

}

}

go WatchEtcd(keys)

return

}

func WatchEtcd(keys []string){

// 這里用于檢測(cè)當(dāng)需要收集的日志信息更改時(shí)及時(shí)更新

var watchChans []clientv3.WatchChan

for _,key := range keys{

rch := Client.Watch(context.Background(),key)

watchChans = append(watchChans,rch)

}

for {

for _,watchC := range watchChans{

select{

case wresp := 《-watchC:

for _,ev:= range wresp.Events{

logs.Debug(“%s %q : %q

”, ev.Type, ev.Kv.Key, ev.Kv.Value)

logConfChan 《- string(ev.Kv.Value)

}

default:

}

}

time.Sleep(time.Second)

}

waitGroup.Done()

}

func GetLogConf()chan string{

return logConfChan

}

同樣的這里增加對(duì)了限速的處理,畢竟日志收集程序不能影響了當(dāng)前業(yè)務(wù)的性能,所以增加了limit.go用于限制速度:

package main

import (

“time”

“sync/atomic”

“github.com/astaxie/beego/logs”

type SecondLimit struct {

unixSecond int64

curCount int32

limit int32

}

func NewSecondLimit(limit int32) *SecondLimit {

secLimit := &SecondLimit{

unixSecond:time.Now().Unix(),

curCount:0,

limit:limit,

}

return secLimit

}

func (s *SecondLimit) Add(count int) {

sec := time.Now().Unix()

if sec == s.unixSecond {

atomic.AddInt32(&s.curCount,int32(count))

return

}

atomic.StoreInt64(&s.unixSecond,sec)

atomic.StoreInt32(&s.curCount, int32(count))

}

func (s *SecondLimit) Wait()bool {

for {

sec := time.Now().Unix()

if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {

time.Sleep(time.Microsecond)

logs.Debug(“l(fā)imit is running,limit:%d s.curCount:%d”,s.limit,s.curCount)

continue

}

if sec != atomic.LoadInt64(&s.unixSecond) {

atomic.StoreInt64(&s.unixSecond,sec)

atomic.StoreInt32(&s.curCount,0)

}

logs.Debug(“l(fā)imit is exited”)

return false

}

}

小結(jié)

這次基本實(shí)現(xiàn)了日志收集的前半段的處理,后面將把日志扔到es中,并最終在頁(yè)面上呈現(xiàn)

來(lái)源:IT大咖說(shuō)

責(zé)任編輯:gt

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 開發(fā)
    +關(guān)注

    關(guān)注

    0

    文章

    373

    瀏覽量

    41512
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4900

    瀏覽量

    70724

原文標(biāo)題:Go實(shí)現(xiàn)海量日志收集系統(tǒng)

文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評(píng)論

    相關(guān)推薦
    熱點(diǎn)推薦

    基于RV1126開發(fā)板限制系統(tǒng)日志大小教程

    無(wú)論管理什么系統(tǒng),對(duì)日志文件的監(jiān)控、調(diào)用、管理都是其中重要的一部分。服務(wù)器問(wèn)題的解決都是從查看系統(tǒng)(錯(cuò)誤)日志開始的。系統(tǒng)
    的頭像 發(fā)表于 04-16 11:18 ?233次閱讀
    基于RV1126<b class='flag-5'>開發(fā)</b>板限制<b class='flag-5'>系統(tǒng)</b><b class='flag-5'>日志</b>大小教程

    Wine開發(fā)系列——如何使用Wine日志調(diào)試問(wèn)題

    助于快速理解代碼的執(zhí)行流程和功能。在大型項(xiàng)目中,通常會(huì)先實(shí)現(xiàn)一套自己的調(diào)試日志框架,主要有兩個(gè)目的: 統(tǒng)一日志風(fēng)格和存儲(chǔ):確保日志格式一致,并且有統(tǒng)一的存儲(chǔ)方式,這有助于用戶更容易地報(bào)
    的頭像 發(fā)表于 01-06 11:29 ?999次閱讀

    AI大語(yǔ)言模型開發(fā)步驟

    開發(fā)一個(gè)高效、準(zhǔn)確的大語(yǔ)言模型是一個(gè)復(fù)雜且多階段的過(guò)程,涉及數(shù)據(jù)收集與預(yù)處理、模型架構(gòu)設(shè)計(jì)、訓(xùn)練與優(yōu)化、評(píng)估與調(diào)試等多個(gè)環(huán)節(jié)。接下來(lái),AI部落小編為大家詳細(xì)闡述AI大語(yǔ)言模型的
    的頭像 發(fā)表于 12-19 11:29 ?892次閱讀

    語(yǔ)言模型開發(fā)框架是什么

    語(yǔ)言模型開發(fā)框架是指用于訓(xùn)練、推理和部署大型語(yǔ)言模型的軟件工具和庫(kù)。下面,AI部落小編為您介紹大語(yǔ)言模型開發(fā)框架。
    的頭像 發(fā)表于 12-06 10:28 ?523次閱讀

    語(yǔ)言模型開發(fā)語(yǔ)言是什么

    在人工智能領(lǐng)域,大語(yǔ)言模型(Large Language Models, LLMs)背后,離不開高效的開發(fā)語(yǔ)言和工具的支持。下面,AI部落小編為您介紹大語(yǔ)言模型
    的頭像 發(fā)表于 12-04 11:44 ?692次閱讀

    云端語(yǔ)言模型開發(fā)方法

    云端語(yǔ)言模型的開發(fā)是一個(gè)復(fù)雜而系統(tǒng)的過(guò)程,涉及數(shù)據(jù)準(zhǔn)備、模型選擇、訓(xùn)練優(yōu)化、部署應(yīng)用等多個(gè)環(huán)節(jié)。下面,AI部落小編為您分享云端語(yǔ)言模型的開發(fā)
    的頭像 發(fā)表于 12-02 10:48 ?688次閱讀

    在學(xué)習(xí)go語(yǔ)言的過(guò)程踩過(guò)的坑

    作為一個(gè)5年的phper,這兩年公司和個(gè)人都在順應(yīng)技術(shù)趨勢(shì),新項(xiàng)目慢慢從php轉(zhuǎn)向了go語(yǔ)言,從2021年到現(xiàn)在,筆者手上也先后開發(fā)了兩個(gè)go項(xiàng)目。在學(xué)習(xí)
    的頭像 發(fā)表于 11-11 09:22 ?469次閱讀

    語(yǔ)言模型如何開發(fā)

    語(yǔ)言模型的開發(fā)是一個(gè)復(fù)雜且細(xì)致的過(guò)程,涵蓋了數(shù)據(jù)準(zhǔn)備、模型架構(gòu)設(shè)計(jì)、訓(xùn)練、微調(diào)和部署等多個(gè)階段。以下是對(duì)大語(yǔ)言模型開發(fā)步驟的介紹,由AI部落小編整理發(fā)布。
    的頭像 發(fā)表于 11-04 10:14 ?604次閱讀

    go語(yǔ)言如何解決并發(fā)問(wèn)題

    作為一個(gè)后端開發(fā),日常工作中接觸最多的兩門語(yǔ)言就是PHP和GO了。無(wú)可否認(rèn),PHP確實(shí)是最好的語(yǔ)言(手動(dòng)狗頭哈哈),寫起來(lái)真的很舒爽,沒有任何心智負(fù)擔(dān),字符串和整型壓根就不用區(qū)分,
    的頭像 發(fā)表于 10-23 13:38 ?506次閱讀
    <b class='flag-5'>go</b><b class='flag-5'>語(yǔ)言</b>如何解決并發(fā)問(wèn)題

    systemd journal收集日志的三種方式

    隨著 systemd 成了主流的 init 系統(tǒng),systemd 的功能也在不斷的增加,比如對(duì)系統(tǒng)日志的管理。Systemd 設(shè)計(jì)的日志系統(tǒng)
    的頭像 發(fā)表于 10-23 11:50 ?820次閱讀
    systemd journal<b class='flag-5'>收集</b><b class='flag-5'>日志</b>的三種方式

    納尼?自建K8s集群日志收集還能通過(guò)JMQ保存到JES

    作者:京東科技 劉恩浩 一、背景 基于K8s集群的私有化交付方案中,日志收集采用了ilogtail+logstash+kafka+es方案,其中ilogtail負(fù)責(zé)日志
    的頭像 發(fā)表于 09-30 14:45 ?449次閱讀

    鴻蒙原生應(yīng)用元服務(wù)開發(fā)-初識(shí)倉(cāng)頡開發(fā)語(yǔ)言

    輕量化線程(原生協(xié)程),以及簡(jiǎn)單易用的并發(fā)編程機(jī)制,保證并發(fā)場(chǎng)景的高效開發(fā)和運(yùn)行。 兼容語(yǔ)言生態(tài) :倉(cāng)頡編程語(yǔ)言支持和 C 等主流編程語(yǔ)言的互操作,并
    發(fā)表于 08-15 10:00

    linux日志管理之journalctl命令

    journalctl 用來(lái)查詢 systemd-journald 服務(wù)收集到的日志。systemd-journald 服務(wù)是 systemd init 系統(tǒng)提供的收集
    的頭像 發(fā)表于 08-14 18:18 ?3781次閱讀
    linux<b class='flag-5'>日志</b>管理之journalctl命令

    三十分鐘入門基礎(chǔ)Go Java小子版

    前言 Go語(yǔ)言定義 Go(又稱 Golang)是 Google 的 Robert Griesemer,Rob Pike 及 Ken Thompson 開發(fā)的一種靜態(tài)、強(qiáng)類型、編譯型
    的頭像 發(fā)表于 08-12 14:32 ?983次閱讀
    三十分鐘入門基礎(chǔ)<b class='flag-5'>Go</b> Java小子版

    鴻蒙原生應(yīng)用元服務(wù)開發(fā)-初識(shí)倉(cāng)頡開發(fā)語(yǔ)言

    輕量化線程(原生協(xié)程),以及簡(jiǎn)單易用的并發(fā)編程機(jī)制,保證并發(fā)場(chǎng)景的高效開發(fā)和運(yùn)行。 兼容語(yǔ)言生態(tài) :倉(cāng)頡編程語(yǔ)言支持和C 等主流編程語(yǔ)言的互操作,并
    發(fā)表于 07-30 17:49