考慮了很久,要不要記錄airflow相關的東西, 應該怎麼記錄. 官方文件已經有比較詳細的介紹了,還有各種部落格,我需要有乙份自己的筆記嗎?
答案就從本文開始了.
本文將從乙個陌生視角開始認知airflow,順帶勾勒出應該如何一步步搭建我們的資料排程系統.
現在是2023年9月上旬, airflow最近的乙個版本是1.10.5.ps. 查資料發現自己好多文章被爬走,換了作者.所以,接下裡的內容會隨機新增一些防偽標識,忽略即可.
什麼資料排程系統?
中臺這個概念最近比較火, 其中就有乙個叫做資料中臺, 文章資料中臺到底是什麼給出了乙個概念.
我粗糙的理解, 大概就是: 收集各個零散的資料,標準化,然後服務化, 提供統一資料服務. 而要做到資料整理和處理,必然涉及資料排程,也就需要乙個排程系統.[本文出自ryan miao]
資料排程系統可以將不同的異構資料互相同步,可以按照規劃去執行資料處理和任務排程. airflow就是這樣的乙個任務排程平台.
前面airflow1.10.4介紹與安裝已經
安裝好了我們的airflow, 可以直接使用了. 這是第乙個dag任務鏈.
目標: 每天早上8點執行乙個任務--列印hello world
在linux上,我們可以在crontab插入一條記錄:
使用springboot, 我們可以使用@scheduled(cron="0 0 8 * * ?")
來定時執行乙個method.
使用quartz, 我們可以建立乙個crontrigger
, 然後去執行對應的jobdetail.
crontrigger trigger = (crontrigger)triggerbuilder.newtrigger()
.withidentity("trigger1", "group1")
.withschedule(cronschedulebuilder.cronschedule("0 0 8 * * ?"))
.build();
使用airflow, 也差不多類似.
在docker-airflow中,我們將dag掛載成磁碟,現在只需要在dag目錄下編寫dag即可.
volumes:
- ./dags:/usr/local/airflow/dags
建立乙個hello.py
"""
airflow的第乙個dag
"""from airflow import dag
from airflow.operators.bash_operator import bashoperator
from datetime import datetime
default_args =
dag = dag("hello-world",
description="第乙個dag",
default_args=default_args,
schedule_interval='0 8 * * *')
t1 = bashoperator(task_id="hello", bash_command="echo 'hello world, today is }'", dag=dag)
這是乙個python指令碼, 主要定義了兩個變數.
dag
表示乙個有向無環圖,乙個任務鏈, 其id全域性唯一. dag是airflow的核心概念, 任務裝載到dag中, 封裝成任務依賴鏈條. dag決定這些任務的執行規則,比如執行時間.這裡設定為從9月1號開始,每天8點執行.
task
task表示具體的乙個任務,其id在dag內唯一. task有不同的種類,通過各種operator外掛程式來區分任務型別. 這裡是乙個bashoperator, 來自airflow自帶的外掛程式, airflow自帶了很多拆箱即用的外掛程式.
ds
airflow內建的時間變數模板, 在渲染operator的時候,會注入乙個當前執行日期的字串. 後面會專門講解這個執行日期.
[本文出自ryan miao]
將上述hello.py
上傳到dag目錄, airflow會自動檢測檔案變化, 然後解析py檔案,匯入dag定義到資料庫.
訪問airflow位址,重新整理即可看到我們的dag.
開啟dag, 進入dag定義, 可以看到已經執行了昨天的任務.
點選任務例項, 點選view log可以檢視日誌
我們的任務在這台機器上執行,並列印了hello, 注意, 這個列印的日期.
這樣就是乙個基本的airflow任務單元了, 這個任務每天8點會執行.
定義乙個任務的具體內容,比如這裡就是列印hello world,today is }
.
任務設定了執行時間,每次執行時會生成乙個例項,即 dag-task-executiondate 標記乙個任務例項.任務例項和任務當前代表的執行時間繫結. 本demo中,每天會生成乙個任務例項.
今天是2019-09-07, 但我們日誌裡列印的任務執行日期是2019-09-06.
執行日期是任務例項執行所代表的任務時間, 我們通常叫做execute-date或bizdate, 類似hive表的的分割槽.
為什麼今天執行的任務,任務的時間變數是昨天呢?
因為任務例項是乙個時間段的任務, 比如計算每天的訪問量, 我們只有6號這一天過去了才能計算6號這一天的的總量. 那這個任務最早要7號0點之後才能計算, 計算6號0點到7號0點之間的訪問量.所以,這個任務時間就代表任務要處理的資料時間, 就是6號. 任務真正執行時間不固定的, 可以7號, 也可以8號, 只要任務執行計算的資料區間是6號就可以了.
因此, 排程系統中的ds(execution date)通常是過去的乙個週期, 即本週期執行上週期的任務.
最典型的任務模型etl(extract & transformation & loading,即資料抽取,轉換,載入)最少也要分成3步. 對於每天要統計訪問量這個目標來說, 我必須要抽取訪問日誌, 找到訪問量的字段, 計算累加. 這3個任務之間有先後順序,必須前乙個執行完畢之後,後乙個才可以執行. 這叫任務依賴. 不同的任務之間的依賴.在airflow裡, 通過在關聯任務實現依賴.
還有同乙個任務的時間依賴. 比如,計算新增使用者量, 我必須知道前天的資料和昨天的資料, 才能計算出增量. 那麼, 這個任務就必須依賴於昨天的任務狀態. 在airflow裡,通過設定depends_on_past
來決定.
airflow裡有個功能叫backfill, 可以執行過去時間的任務. 我們把這個操作叫做補錄或者補數,為了計算以前沒計算的資料.
我們的任務是按時間執行的, 今天建立了乙個任務, 計算每天的使用者量, 那麼明天會跑出今天的資料. 這時候,我想知道過去1個月每天的使用者增量怎麼辦?
自己寫code, 只要查詢日期範圍的資料,然後分別計算就好. 但排程任務是固定的, 根據日期去執行的. 我們只能建立不同日期的任務例項去執行這些任務. backfill就是實現這種功能的.
讓跑過的任務再跑一次.
在airflow裡, 通過點選任務例項的clear按鈕, 刪除這個任務例項, 然後排程系統會再次建立並執行這個例項.
關於排程系統這個實現邏輯, 我們後面有機會來檢視原始碼了解.
本文沒太實質性的任務具體介紹, 而是引出hello world, 先跑起來,我們接下來繼續完善我們的dag.
python第乙個程式設計 第乙個 Python 程式
簡述 安裝完 python 後,windows 中 開始選單或安裝目錄下就會有 idle 開發 python 程式的基本 ide 整合開發環境 幫助手冊 模組文件等。linux 中 只需要在命令列中輸入 python 命令即可啟動互動式程式設計。互動式程式設計 互動式程式設計不需要建立指令碼檔案,是...
第乙個部落格
我不知道為什麼 我在csdn上創了乙個賬號,又開通了部落格。也許我不是名人,也許幻想著成為名人。在這裡 我不會給任何人許諾,這個部落格可能有乙個博文 有兩個博文 或者會有很多 很多 很多。不過讓我有個大膽的猜想,如果這個部落格在今後有很多很多自己寫的博文,說明我成功了 在自己眼裡 也說明這個方法時正...
第乙個爬蟲
很多人學習python的目的就是為了學習能夠實現爬蟲的功能,這裡,我使用了scrapy框架來實現了乙個簡單的爬蟲功能,這裡我簡單的介紹一下scrapy專案的建立,和執行。1,第一步是安裝scrapy,我相信到了這一步,大多數人都已經會安裝第三方庫檔案了,這裡主要是使用命令pip install sc...