1.準備工作
其中還要額外安裝的是:
sudo pip-3.6 install -i 'apache-airflow[celery]'
sudo pip-3.6 install -i boto3
1.2. 配置好本地aws credentials,此credential需有啟動emr 的許可權。
1.3. 置資料庫為外部資料庫:
sql_alchemy_conn = mysql://user:password@database_location/airflowdb
使用下面的命令檢查並初始化:
airflow initdb
1.4. 配置executor 為 celeryexecutor
executor = celeryexecutor
修改後可以保證相互無依賴的任務可以並行執行。預設為sequentialexecutor,也就是按順序執行。
1.5 配置broker_url 與 result_backend
airflow.cfg 檔案中修改以下兩個條目:
broker_url = sqla+mysql:// user:password@database_location:3306/airflowdb
result_backend = db+mysql:// user:password@database_location:3306/airflowdb
配置完後啟動airflow 的web ui,worker,flower以及scheduler:
airflow webserver -p 8080 &
airflow worker &
airflow flower &
airflow scheduler &
2.定義工作流
建立dag_trasform.py 檔案,在檔案中定義工作流:
from airflow import其中各個bashoperator中的指令碼需自行實現,根據需求實現即可。dagfrom airflow.operators.bash_operator import
bashoperator
from datetime import
datetime, timedelta
from airflow.sensors.external_task_sensor import
externaltasksensor
default_args =
dag = dag('
dag_transform
', default_args=default_args,
schedule_interval=timedelta(days=1))
#create emr cluster
t0 =bashoperator(
task_id='
create_emr_cluster',
bash_command='
python3 /home/hadoop/scripts/launch_emr.py',
dag=dag)
#do wordcount
t1 =bashoperator(
task_id='
spark_job',
bash_command='
python3 /home/hadoop/scripts/submit_spark_job.py',
dag=dag)
#check result in s3
t2 =bashoperator(
task_id='
check_s3',
bash_command='
python3 /home/hadoop/scripts/check_s3_result.py',
dag=dag)
#hive query
t3 =bashoperator(
task_id='
query',
bash_command='
python3 /home/hadoop/scripts/query_result.py',
dag=dag)
#terminate cluster
t4 =bashoperator(
task_id='
terminate_cluster',
bash_command='
python3 /home/hadoop/scripts/terminate_cluster.py',
dag=dag)
#define airflow dag
t0 >>t1
t1 >>t2
t2 >>t3
t3 >> t4
3.重制airflow資料庫
將 dag_transform.py 檔案放入 airflow/dags/ 下,然後重置 airflow 資料庫:airflow resetdb
4.執行
在airflow裡手動執行這個dag,可以看到這個dag已經開始執行:
檢視 dag_transform 可以看到已經在執行啟動emr的指令碼了:
[[2020-03-12 12:42:54,197] info - temporary script location: /tmp/airflowtmptwdg7a_6/create_emr_clusterlbzuu36e
[2020-03-12 12:42:54,197] info - running command: python3 /home/hadoop/scripts/launch_emr.py
可以看到 emr 集群正在啟動:
t1 spark wordcount 開始執行:
t2 完成後,t3 hive query 開始執行:
最後,整個dag執行完畢:
我們也可以看到emr集群開始自動終止:
SwiftUI 學習筆記(二) 整合OpenCV
先直接進入主題,看看能不能整合opencv,這個才是我要考慮的,整合很麻煩的話,估計直接就放棄吧。先新增依賴庫,新增libc tbd就可以了,不需要其它的庫。不知道為什麼,我這裡要做兩次才會在這裡顯示這個libc tbd 好了,接著新增opencv2庫,先從frameworks目錄上右鍵選單,選擇 ...
PMP知識點 二 整合管理
此系列文章分享給想學習pmp的專案經理和想要學習pmp的程式猿們!期望 者快速掌握pmp知識點並實際運用 還有順利考過pmp了 本章內容 專案整合管理過程介紹 前一章已經介紹過pmp把專案管理分為五大過程組,十大知識領域,那麼整合管理作為唯一乙個在五大過程組中都有涉及的乙個知識領域也是最特殊的乙個,...
iOS環信3 0整合 二 UI檔案整合
環信3.0整合相關教程 1 ios環信3.0整合 一 sdk的整合 2 ios環信3.0整合 二 ui檔案整合 3 ios環信3.0整合 三 單聊整合 整合環信3.0ui檔案,需要新增的檔案,如下圖所示 新增完成之後,如下圖所示 檔案新增成功之後,編譯會報錯,因為你沒有新增pch檔案。之前我們新增進...