眾所周知,oozie(1, 2)是基於時間條件與資料生成來做工作流排程的,但是oozie的資料觸發條件只支援hdfs路徑,故而面臨著這樣的問題:
因此,靈活可擴充套件的工作流引擎才是正確姿勢!下面,我將介紹如何用crontab來打造乙個類似於oozie的簡易工作流引擎;對標oozie,其應滿足功能:
判斷hive partition是否已存在,思路比較簡單——show partitions
後能否grep到該partition:
# check wheter $1's partition ($2) exists
hive_partition_exists() " | grep $
[ $? == 0 ]
}
獲取hive 表的最後乙個partition,grep命令配合正規表示式中的lookahead匹配:
# get latest hive partition
latest_hive_partition() " | tail -1 | grep -po "(?<=$=).*"
}
在檢查es index是否寫入完成時,可用思路——定時flush index,然後判斷當前時刻的doc數較上一時刻是否發生變化;若變化,則說明正在寫入。shell指令碼處理json太蛋疼了,故不給出**啦。
所謂「條件輪詢」,是指如果資料未生成,則會一直輪詢該條件是否滿足。我們採用while迴圈中sleep的方式來實現條件輪詢:
hive_partition_exists etl.ad_tb1 $
ad1_exists=$?
hive_partition_exists etl.ad_tb2 $
ad2_exists=$?
while (( $ != 0 || $ != 0))
do echo "`date -d "now"`: log partitions $ not exist, and waiting" >> $
sleep 1m
hive_partition_exists etl.ad_tb1 $
ad1_exists=$?
hive_partition_exists etl.ad_tb2 $
ad2_exists=$?
done
接下來,以hive寫elasticsearch的為例,說明如何用crontab做定時hive任務。hiveql指令碼如下:
add jar /path/to/jars/elasticsearch-hadoop-2.3.1.jar;
set mapred.job.name=ad_tag-$~~$;
set hive.map.aggr = false;
insert overwrite table ad_tag
select media, a.dvc as dvc, case when c1_arr is null then array('empty') else c1_arr end as c1_arr, '$' as week_time
from (
from ad_log
where is_exposure = '1' and day_time between date_sub('$', 6) and '$'
) a
left outer join (
select dvc, collect_set(c1) as c1_arr
from tag
lateral view inline(tag) in_tb
where day_time = '$'
group by dvc
) bon a.dvc = b.dvc;
為了實現任務的並行執行,我用到linux命令中的&
:
log_partition=`date -d "5 day ago" "+%y-%m-%d"`
tag_partition=$(latest_hive_partition tag.dmp_tag day_time)
log_path="$.log"
echo "`date -d "now"`: log partitions $ exist" >> $
echo "`date -d "now"`: latest tag partition $" >> $
hive -f ad_tag1.hql --hivevar log_partition=$ --hivevar tag_partition=$ & hive -f ad_tag2.hql --hivevar log_partition=$ --hivevar tag_partition=$
exit 1
ps: 當手動執行指令碼是ok的,但是crontab去執行時卻出錯,最可能的原因是crontab未能正確載入使用者的環境變數;故可以在執行指令碼中加入:
source /etc/profile
source /path/to/.bashrc
但是,用crontab做工作流排程,會存在如下問題: 工作流建模 工作流概念
工作流建模 工作流概念 1 案例 工作流系統得基本目的是處理案例。每個案例都有乙個唯一標識,而且每個案例的生命週期都是有限的。案例生命週期都處於某個特定狀態,該狀態由三個元素組成 1 案例相關的屬性的值 案例屬性是一系列同案例相關的變數。能夠用來管理案例。正是通過這些變數,才有可能指出在特定條件下某...
工作流 一 什麼是工作流
什麼是工作流 工作流的英文全稱是 workflow,簡單理解則是業務流程的計算機化或自動化。它是是針對工作中具有固定程式的常規活動而提出的乙個概念,通過將工作活動分解定義良好的任務 角色 規則和過程來進行執行和監控,達到提高生產組織水平和工作效率的目的。工作流技術發端於70年代中期辦公自動化領域的研...
知識體系 打造寫作工作流
我想將我的學習過程全部記錄下來,技術,工作,生活,還是思維片段,所有能記的都要記下來,終生學習這個理念不單要植入自己的腦子還要形成肌肉記憶。當然記錄這件事情也一直在做,但是做得並不好,單純的記錄其實意義不大,如果能分享出去,並因此而獲取一些正向的反饋,然後再激勵自己去學習 記錄 分享 獲得正向反饋形...