日誌服務Python消費組實戰(二) 實時分發資料

2021-09-09 08:50:35 字數 4881 閱讀 8678

使用日誌服務的web-tracking、logtail(檔案極簡)、syslog等收集上來的日誌經常存在各種各樣的格式,我們需要針對特定的日誌(例如topic)進行一定的分發到特定的logstore中處理和索引,本文主要介紹如何使用消費組實時分發日誌到不通的目標日誌庫中。並且利用消費組的特定,達到自動平衡、負載均衡和高可用性。

協同消費庫(consumer library)是對日誌服務中日誌進行消費的高階模式,提供了消費組(consumergroup)的概念對消費端進行抽象和管理,和直接使用sdk進行資料讀取的區別在於,使用者無需關心日誌服務的實現細節,只需要專注於業務邏輯,另外,消費者之間的負載均衡、failover等使用者也都無需關心。

消費組(consumer group)- 乙個消費組由多個消費者構成,同乙個消費組下面的消費者共同消費乙個logstore中的資料,消費者之間不會重複消費資料。

消費者(consumer)- 消費組的構成單元,實際承擔消費任務,同乙個消費組下面的消費者名稱必須不同。

在日誌服務中,乙個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給乙個消費組下面的消費者,分配方式遵循以下原則:

協同消費庫的另乙個功能是儲存checkpoint,方便程式故障恢復時能接著從斷點繼續消費,從而保證資料不會被重複消費。

這裡我們描述用python使用消費組進行程式設計,實時根據資料的topic進行分發。

注意:本篇文章的相關**可能會更新,最新版本在這裡可以找到:github樣例.

環境

建議程式執行在源日誌庫同region下的ecs上,並使用區域網服務入口,這樣好處是網路速度最快,其次是讀取沒有外網費用產生。

強烈推薦pypy3來執行本程式,而不是使用標準cpython直譯器。

日誌服務的python sdk可以如下安裝:

pypy3 -m pip install aliyun-log-python-sdk -u
更多sls python sdk的使用手冊,可以參考這裡

如下展示如何配置程式:

配置程式日誌檔案,以便後續測試或者診斷可能的問題(跳過,具體參考樣例)。

基本的日誌服務連線與消費組的配置選項。

目標logstore的一些連線資訊

#encoding: utf8

def get_option():

##########################

# 基本選項

##########################

# 從環境變數中載入sls引數與選項,根據需要可以配置多個目標

accesskeyid = os.environ.get('sls_ak_id', '')

accesskey = os.environ.get('sls_ak_key', '')

endpoint = os.environ.get('sls_endpoint', '')

project = os.environ.get('sls_project', '')

logstore = os.environ.get('sls_logstore', '')

to_endpoint = os.environ.get('sls_endpoint_to', endpoint)

to_project = os.environ.get('sls_project_to', project)

to_logstore1 = os.environ.get('sls_logstore_to1', '')

to_logstore2 = os.environ.get('sls_logstore_to2', '')

to_logstore3 = os.environ.get('sls_logstore_to3', '')

consumer_group = os.environ.get('sls_cg', '')

# 消費的起點。這個引數在第一次跑程式的時候有效,後續再次執行將從上一次消費的儲存點繼續。

# 可以使」begin「,」end「,或者特定的iso時間格式。

cursor_start_time = "2018-12-26 0:0:0"

# 一般不要修改消費者名,尤其是需要併發跑時

consumer_name = "-".format(consumer_group, current_process().pid)

# 構建乙個消費組和消費者

option = loghubconfig(endpoint, accesskeyid, accesskey, project, logstore, consumer_group, consumer_name, cursor_position=cursorposition.special_timer_cursor, cursor_start_time=cursor_start_time)

# bind put_log_raw which is faster

to_client = logclient(to_endpoint, accesskeyid, accesskey)

put_method1 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore1)

put_method2 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore2)

put_method3 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore3)

return option,

注意,這裡使用了functools.partialput_log_raw進行繫結,以便後續呼叫方便。

如下**展示如何從sls拿到資料後根據topic進行**。

if __name__ == '__main__':

option, put_methods = get_copy_option()

def copy_data(shard_id, log_groups):

for log_group in log_groups.loggroups:

# update topic

if log_group.topic in put_methods:

put_methods[log_group.topic](log_group=log_group)

logger.info("*** start to consume data...")

worker = consumerworker(consumerprocessoradaptor, option, args=(copy_data, ))

worker.start(join=true)

假設程式命名為"dispatch_data.py",可以如下啟動:

export sls_endpoint=export sls_ak_id=export sls_ak_key=export sls_project=export sls_logstore=export sls_logstore_to1=export sls_logstore_to1=export sls_logstore_to1=export sls_cg=《消費組名,可以簡單命名為"dispatch_data">

pypy3 dispatch_data.py

基於消費組的程式可以直接啟動多次以便達到併發作用:

nohup pypy3 dispatch_data.py &

nohup pypy3 dispatch_data.py &

nohup pypy3 dispatch_data.py &

...

注意:所有消費者使用了同乙個消費組的名字和不同的消費者名字(因為消費者名以程序id為字尾)。

因為乙個分割槽(shard)只能被乙個消費者消費,假設乙個日誌庫有10個分割槽,那麼最多有10個消費者同時消費。

基於測試,在沒有頻寬限制、接收端速率限制(如splunk端)的情況下,以推進硬體用pypy3執行上述樣例,單個消費者占用大約10%的單核cpu下可以消費達到5 mb/s原始日誌的速率。因此,理論上可以達到50 mb/s原始日誌每個cpu核,也就是每個cpu核每天可以消費4tb原始日誌

注意:這個資料依賴頻寬、硬體引數和目標logstore是否能夠較快接收資料。

消費組會將檢測點(check-point)儲存在伺服器端,當乙個消費者停止,另外乙個消費者將自動接管並從斷點繼續消費。

可以在不同機器上啟動消費者,這樣當一台機器停止或者損壞的清下,其他機器上的消費者可以自動接管並從斷點進行消費。

理論上,為了備用,也可以啟動大於shard數量的消費者。

每乙個日誌庫(logstore)最多可以配置10個消費組,如果遇到錯誤consumergroupquotaexceed則表示遇到限制,建議在控制台端刪除一些不用的消費組。

如果服務入口(endpoint)配置為https://字首,如,程式與sls的連線將自動使用https加密。

日誌服務Python消費組實戰(二) 實時分發資料

使用日誌服務的web tracking logtail 檔案極簡 syslog等收集上來的日誌經常存在各種各樣的格式,我們需要針對特定的日誌 例如topic 進行一定的分發到特定的logstore中處理和索引,本文主要介紹如何使用消費組實時分發日誌到不通的目標日誌庫中。並且利用消費組的特定,達到自動...

消費服務Eureka

在消費者 中對任何服務例項的url進行硬編碼是錯誤的。這不僅將消費者耦合到服務的特定例項,而且如果服務的主機和 或埠要改變,也可能導致消費者中斷。eureka的兩種消費服務方式包括 使用resttemplate消費服務 一旦將應用程式作為eureka客戶端啟用,則可以選擇宣告負載平衡resttemp...

Kafka Consumer 消費者組

官方定義 消費者使用乙個消費者組 即group.id 來標記自己,topic的每條訊息都只會被傳送到每個訂閱它的消費者組的乙個消費者例項上。所有consumer例項都屬於相同group 實現基於佇列的模型。每條訊息只會被乙個consumer例項處理。consumer都屬於不同group 實現基於發布...