日誌收集之rsyslog to kafka

2021-08-27 08:36:36 字數 3146 閱讀 3118

專案需要將日誌收集起來做儲存分析,資料的流向為rsyslog(收集) -> kafka(訊息佇列) -> logstash(清理) -> es、hdfs; 今天我們先將如何利用rsyslog進行日誌收集到kafka。

通過對 rsyslog官方文件 檢視,得知 rsyslog對 kafka的支援是 v8.7.0版本後才提供的支援.通過 changelog 也可以看出 v8.x的版本變化.

最新v8穩定版已經提供rpm包的rsyslog-kafka外掛程式了,直接yum安裝即可,新增yum源:

[rsyslog_v8]

name=adiscon centos-$releasever - local packages for $basearch

baseurl=

enabled=1

gpgcheck=0

gpgkey=

protect=1

新增後yum install rsyslog rsyslog-kafka.x86_64即可完成安裝。

通常利用rainerscript type statements進行非常簡潔明瞭的配置宣告,例如:

mail.info /var/log/mail.log

備註: only message json (cee/lumberjack) properties can be modified by theset,unsetandresetstatements

有很多種input模組, 我們以imfile模組為例, 此模組將所有的文字檔案內容逐行轉到syslog中.

input(type="imfile" tag="kafka" file="analyze.log" ruleset="imfile-kafka"[, facility=local.7])
也叫作actions, 處理動作,格式如下

action (

type="omkafka"

topic="kafka_test"

broker="10.120.169.149:9092"

)

rulesets包括多條rule,一條規則就是rsyslog處理訊息的一種方式, 每個規則包含filter和actions

input(type="imfile" tag="kafka" file="analyze.log" ruleset="rulesetname")

ruleset(name="rulesetname")

通過input裡面的ruleset配置,將輸入流進入ruleset進行規則匹配,然後執行action操作,完成對流的處理。

將不同的輸入流進入不同的佇列並行處理資料,通常在ruleset或者action中配置,預設只有乙個佇列。配置引數例子

action(type="omfwd" target="192.168.2.11" port="10514" protocol="tcp"

queue.filename="forwarding" queue.size="1000000" queue.type="linkedlist"

)

這是rsyslog乙個重要的特性,它可以讓使用者自定義輸入流格式,同樣也可以用於動態生成日誌檔案, 預設是原始格式。

一般表示式如下:

template(parameters)

template(name="tpl1" type="list")

%timestamp:::date-rfc3339% %hostname%%syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n"

將每個日誌字段通過自定義變數和處理方式(property replacer)得到全域性能讀取的日誌變數。

注意:原始格式: v6之前的格式,$template strtpl,"pri: %pri%, msg: %msg%\n"

利用action裡的template引數將templates和action進行繫結,如

action(template=templatename,type="omfile" file="/var/log/all-msgs.log")

增加乙個將nginx access日誌通過rsyslog傳輸到kafka的例項,將nginx_kafka.conf放入到/etc/rsyslog.d目錄中,重啟rsyslog即可。

# 載入omkafka和imfile模組

module(load="omkafka")

module(load="imfile")

# nginx template

template(name="nginxaccesstemplate" type="string" string="%hostname%<-+>%syslogtag%<-+>%msg%\n")

# ruleset

ruleset(name="nginx-kafka")

input(type="imfile" tag="nginx,aws" file="/var/log/access.log" ruleset="nginx-kafka")

檢查conf檔案是否正確可以執行rsyslogd debug模式rsyslogd -dn執行,看日誌輸出結果,或者直接執行rsyslogd -n 1檢查conf檔案是否正確。

FLUME日誌收集

flume是乙個分布式 可靠 和高可用的海量日誌聚合的系統,支援在系統中定製各類資料傳送方,用於收集資料 同時,flume提供對資料進行簡單處理,並寫到各種資料接受方 可定製 的能力。1 可靠性 當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。flume提供了三種級別的可靠性保障,從強到弱依...

Tengine Lua Kafka收集日誌

對於線上大流量服務或者需要上報日誌的nginx服務,每天會產生大量的日誌,這些日誌非常有價值。可用於計數上報 使用者行為分析 介面質量 效能監控等需求。但傳統nginx記錄日誌的方式資料會散落在各自nginx上,而且大流量日誌本身對磁碟也是一種衝擊。我們需要把這部分nginx日誌統一收集彙總起來,收...

日誌收集 Flume

乙個flume程序就是乙個agent source channel sink 在source channel sink之間流動的是list exent event header byte payload,一般header是空的,傳輸資料的單位 flume串聯的壞處 乙個壞了全壞 好處 多級緩衝 處理...