Datax 與 Azkaban 實現資料抽取與排程

2021-08-21 18:52:36 字數 2724 閱讀 3157

1.什麼是datax

datax 是阿里巴巴集團內被廣泛使用的離線資料同步工具/平台,實現包括 mysql、oracle、hdfs、hive、oceanbase、hbase、ots、odps 等各種異構資料來源之間高效的資料同步功能。datax採用了框架 + 外掛程式 的模式,目前已開源,**託管在github

datax的安裝省略

配置詳情可見

執行原理介紹:

datax完成單個資料同步的作業,我們稱之為job,datax接受到乙個job之後,將啟動乙個程序來完成整個作業同步過程。datax job模組是單個作業的中樞管理節點,承擔了資料清理、子任務切分(將單一作業計算轉化為多個子task)、taskgroup管理等功能。

dataxjob啟動後,會根據不同的源端切分策略,將job切分成多個小的task(子任務),以便於併發執行。task便是datax作業的最小單元,每乙個task都會負責一部分資料的同步工作。

切分多個task之後,datax job會呼叫scheduler模組,根據配置的併發資料量,將拆分成的task重新組合,組裝成taskgroup(任務組)。每乙個taskgroup負責以一定的併發(可在json配置檔案中配置)執行完畢分配好的所有task,預設單個任務組的併發數量為5。

每乙個task都由taskgroup負責啟動,task啟動後,會固定啟動reader—>channel—>writer的執行緒來完成任務同步工作。

datax作業執行起來之後, job監控並等待多個taskgroup模組任務完成,等待所有taskgroup任務完成後job成功退出。否則,異常退出。

工作流程介紹:

工作流程大概就是用reader模組從源資料庫讀資料,在storage模組裡將reader模組讀到的資料交換給write模組,

write模組將資料寫進目的資料庫。

doublequeue:

設立兩塊空間,乙個儲存源資料,乙個儲存目標資料。在開始,空間a和空間b都是空的,loading 任務從源資料庫向a空間載入資料,a空間滿後再向b空間載入資料,同時dumping任務將a空間資料轉儲到目的資料庫。a空間清空後,交換ab兩者的任務,即a空間的任務換成loading,b空間的任務換成dumping。不斷重複上述操作。

ramstorage:基於doublequeue,用記憶體作為資料交換的空間

基於ramstorage的資料操縱介面:linesender和linereceiver

linesender的作用:reader用linesender來放資料到storage物件中。

在linesender介面裡,主要有這幾個介面:

createline():構造乙個將要被用來交換資料的line物件

sendtowriter(line line): 用來將乙個line物件put到storage抽象類裡。

flush()用來將buffer的資料flush到storage物件中。

linereceiver的作用:writer用linereceiver來從到storage物件中獲取資料。在linereceiver介面裡,主要有乙個介面:getfromreader():獲取下乙個storage中的line物件。

基於ramstorage的批量資料交換:bufferedlineexchanger

內部初始化乙個指定大小的陣列緩衝,預設大小64 ,在push資料時會先寫滿64個陣列再單次寫入doublequeue佇列,poll時返回的大小可能會小於64個單位,由當時陣列的實際大小決定。

2.什麼是azkaban

azkaban是由linkedin公司推出的乙個批量工作流任務排程器,主要用於在乙個工作流內以乙個特定的順序執行一組工作和流程,它的配置是通過簡單的key:value對的方式,通過配置中的dependencies 來設定依賴關係,這個依賴關係必須是無環的,否則會被視為無效的工作流。azkaban使用job配置檔案建立任務之間的依賴關係,並提供乙個易於使用的web使用者介面維護和跟蹤你的工作流。

需求:從oracle抽取每天的使用者行為日表到hive分割槽表中,即資料的ods層

技術選型:利用datax與azkaban,其中datax可方便配置能與關係型資料庫進行互動

其中azkabn的排程job為:

把此job上傳到azkaban的介面如圖:

其中datax的配置為:

問題解決:

azkaban-web-start.sh啟動時出現table 'execution_flows' is marked as crashed and should be repaired query

由於azkaban非正常關閉,導致資料表損壞現對資料進行修復

修復後對資料進行check如圖,說明表已經repair

對錶進行修復後,順利啟動了azkaban

Azkaban介紹與使用

參考文章 azkaban是乙個開源的 任務排程系統,用於負責任務的排程執行 如資料倉儲排程 用以替代 linux 中的crontab 可以用來解決多個 hadoop 或spark 等 離線計算任務之間的依賴關係 問題。三 部分 relational database azkaban的使用 支援單 多...

DataX的部署與安裝

剛來公司的第二天做的事就是把cd的資料導到bj來,因為不能按照原來一模一樣的去導所以就用到了datax,而datax幫我搞定了這個難題,有道是花時間想辦法解決,不如找工具研究。下面開始 解壓到虛擬機器上 tar zxvf datax.tar.gz 修改許可權為755 進入bin 目錄即可操作同步作業...

介面實現與配置實現

在實現系統功能的時候,通常會首先定義好功能的介面,在系統功能不斷被實現的過程中,慢慢的發現有些介面的實現很類似,這個時候通常會開始做一次抽象,形 成乙個共同的部分,慢慢的系統形成了乙個抽象的層次,而為了通用,通常是通過定義介面,形成乙個抽象類,抽象類中暴露出一些抽象方法供外部擴充套件實 現,逐步的積...