filebeat啟動流程 講解了filebeat
的啟動流程,filebeat
在構建完crawler
物件,開始採集流程。
crawler
的start
方法內,會啟動inputs
func
(c *crawler)
start
( pipeline beat.pipeline,
r *registrar.registrar,
configinputs *common.config,
configmodules *common.config,
pipelineloade***ctory fileset.pipelineloade***ctory,
overwritepipelines bool,)
error
}...
}
c.startinput(pipeline, inputconfig, r.getstates())
方法初始化input
。
首先構建input
物件
執行input
func
(c *crawler)
startinput
( pipeline beat.pipeline,
config *common.config,
states [
]file.state,
)error
connector := channel.
connectto
(pipeline, c.out)
p, err := input.
new(config, connector, c.beatdone, states,
nil)
...// 開始收集
p.start()
return
nil}
p.start()
方法內啟動input
,他在乙個單獨的協程裡執行。
這裡的p
是對input
的封裝,他的run
方法是對某個介面的實現,因為我們用來收集日誌,所以我們只需要關心filebeat/input/log/input.go
檔案內的run
方法。run
方法內部呼叫了input
的scan
方法,開始採集資料。
// run runs the input
func
(p *input)
run(
)
scan
方法內首先獲取所有的檔案。其次獲取檔案狀態,根據狀態來判定收集最新資料,還是從歷史檔案收集。檔案收集會構建harvester
物件。
// scan starts a scanglob for each provided path/glob
func
(p *input)
scan()
else
select
newstate, err :=
getfilestate
(path, info, p)
if err !=
nil// load last state
laststate := p.states.
findprevious
(newstate)
...// decides if previous state exists
if laststate.
isempty()
if err !=
nil}
else
}}
p.startharvester(newstate, 0)
內構建harvester
。(harvester
是另乙個filebeat
官網描述的核心元件之一)
func
(p *input)
startharvester
(state file.state, offset int64
)error
// set state to "not" finished to indicate that a harvester is running
state.finished =
false
state.offset = offset
// create harvester with state
// 這部分構建了 harvester
h, err := p.
createharvester
(state,
func()
)if err !=
nil// 配置 harvester
err = h.
setup()
if err !=
nil// update state before staring harvester
// this makes sure the states is set to finished: false
// this is synchronous state update as part of the scan
h.sendstateupdate()
// 啟動 harvester
if err = p.harvesters.
start
(h); err !=
nilreturn err
}
p.createharvester
構建harvester
p.setup
配置harvester
。setup
方法內會初始化檔案相關的內容,以及構建檔案reader
。
p.harvesters.start(h)
執行harvester
主要還是要看harvesters.start
方法,會在單獨的協程內執行harvester
。
func
(r *registry)
start
(h harvester)
error()
// 非同步執行
err := h.
run(
)if err !=
nil}()
return
nil}
harvester.run
方法真是長。。
func
(h *harvester)
run(
)error
return
nil}
state := h.
getstate()
startingoffset := state.offset
state.offset +=
int64
(message.bytes)
...// 讀取到的檔案內容
text :=
string
(message.content)
...// 資料內容都包裝在 data 內,harvester 傳送 data,其實就是 forwarder **的
if!h.
sendevent
(data, forwarder)
// update state of harvester as successfully sent
h.state = state
}}
h.sendevent(data, forwarder)
這段**將採集的資料傳送到下游,內部其實就是用forwarder
**了資料。
到這裡資料的採集流程應該就差不多了,剩下的是資料的傳送流程。
資料採集流程整理
資料採集2013年11月24日到現在已經有38天了。這中間斷斷續續的,但卻始終沒有停止過。回頭想想我們走過的路程可分為以下六個方面 一開始,公尺老師就讓我們拿出一套方案來,關於這次採集資料的執行方案。每次任務的執行都要有乙個規劃指導。以指導為方向,不至於走錯方向。有了指導,可以彌補執行中的不足。有了...
filebeat 啟動流程
因為各種各樣的原因,好久沒有寫部落格了,還是希望能夠堅持下來 講解一下filebeat的啟動流程吧,核心功能先不描述了0.0 filebeat啟動入口在main.go檔案內,cmd.rootcmd.execute 啟動filebeat func main 在filebeat cmd root.go檔...
Filebeat採集日誌講解(一)
總結在elkf中,filebeat作為日誌採集端,採集日誌併發送到kafka。input就是以檔案形式儲存的log檔案,output就是kafka集群。在採集日誌中一般情況有以下幾點需要注意的 輸出內容確定,一般包括時間戳,主機名,日誌型別,日誌內容,其他的根據業務的實際需求,無用資訊可以直接過濾掉...