airflow(二)整合EMR使用

2021-10-03 20:02:10 字數 3295 閱讀 5637

1.準備工作

其中還要額外安裝的是:

sudo pip-3.6 install -i 'apache-airflow[celery]'

sudo pip-3.6 install -i boto3

1.2. 配置好本地aws credentials,此credential需有啟動emr 的許可權。

1.3. 置資料庫為外部資料庫:

sql_alchemy_conn = mysql://user:password@database_location/airflowdb

使用下面的命令檢查並初始化:

airflow initdb

1.4. 配置executor 為 celeryexecutor

executor = celeryexecutor

修改後可以保證相互無依賴的任務可以並行執行。預設為sequentialexecutor,也就是按順序執行。

1.5 配置broker_url 與 result_backend

airflow.cfg 檔案中修改以下兩個條目:

broker_url = sqla+mysql:// user:password@database_location:3306/airflowdb

result_backend = db+mysql:// user:password@database_location:3306/airflowdb

配置完後啟動airflow 的web ui,worker,flower以及scheduler:

airflow webserver -p 8080 &

airflow worker &

airflow flower &

airflow scheduler &

2.定義工作流

建立dag_trasform.py 檔案,在檔案中定義工作流

from airflow import

dagfrom airflow.operators.bash_operator import

bashoperator

from datetime import

datetime, timedelta

from airflow.sensors.external_task_sensor import

externaltasksensor

default_args =

dag = dag('

dag_transform

', default_args=default_args,

schedule_interval=timedelta(days=1))

#create emr cluster

t0 =bashoperator(

task_id='

create_emr_cluster',

bash_command='

python3 /home/hadoop/scripts/launch_emr.py',

dag=dag)

#do wordcount

t1 =bashoperator(

task_id='

spark_job',

bash_command='

python3 /home/hadoop/scripts/submit_spark_job.py',

dag=dag)

#check result in s3

t2 =bashoperator(

task_id='

check_s3',

bash_command='

python3 /home/hadoop/scripts/check_s3_result.py',

dag=dag)

#hive query

t3 =bashoperator(

task_id='

query',

bash_command='

python3 /home/hadoop/scripts/query_result.py',

dag=dag)

#terminate cluster

t4 =bashoperator(

task_id='

terminate_cluster',

bash_command='

python3 /home/hadoop/scripts/terminate_cluster.py',

dag=dag)

#define airflow dag

t0 >>t1

t1 >>t2

t2 >>t3

t3 >> t4

其中各個bashoperator中的指令碼需自行實現,根據需求實現即可。

3.重制airflow資料庫

將 dag_transform.py 檔案放入 airflow/dags/ 下,然後重置 airflow 資料庫:airflow resetdb

4.執行

在airflow裡手動執行這個dag,可以看到這個dag已經開始執行:

檢視 dag_transform 可以看到已經在執行啟動emr的指令碼了

[[2020-03-12 12:42:54,197]  info - temporary script location: /tmp/airflowtmptwdg7a_6/create_emr_clusterlbzuu36e
[2020-03-12 12:42:54,197]  info - running command: python3 /home/hadoop/scripts/launch_emr.py
可以看到 emr 集群正在啟動:

t1 spark wordcount 開始執行:

t2 完成後,t3 hive query 開始執行:

最後,整個dag執行完畢:

我們也可以看到emr集群開始自動終止:

SwiftUI 學習筆記(二) 整合OpenCV

先直接進入主題,看看能不能整合opencv,這個才是我要考慮的,整合很麻煩的話,估計直接就放棄吧。先新增依賴庫,新增libc tbd就可以了,不需要其它的庫。不知道為什麼,我這裡要做兩次才會在這裡顯示這個libc tbd 好了,接著新增opencv2庫,先從frameworks目錄上右鍵選單,選擇 ...

PMP知識點 二 整合管理

此系列文章分享給想學習pmp的專案經理和想要學習pmp的程式猿們!期望 者快速掌握pmp知識點並實際運用 還有順利考過pmp了 本章內容 專案整合管理過程介紹 前一章已經介紹過pmp把專案管理分為五大過程組,十大知識領域,那麼整合管理作為唯一乙個在五大過程組中都有涉及的乙個知識領域也是最特殊的乙個,...

iOS環信3 0整合 二 UI檔案整合

環信3.0整合相關教程 1 ios環信3.0整合 一 sdk的整合 2 ios環信3.0整合 二 ui檔案整合 3 ios環信3.0整合 三 單聊整合 整合環信3.0ui檔案,需要新增的檔案,如下圖所示 新增完成之後,如下圖所示 檔案新增成功之後,編譯會報錯,因為你沒有新增pch檔案。之前我們新增進...