美團DB資料同步到資料倉儲的架構與實踐 Mysql

2021-10-01 22:18:23 字數 4758 閱讀 7272

背景

在資料倉儲建模中,未經任何加工處理的原始業務層資料,我們稱之為ods(operational data store)資料。在網際網路企業中,常見的ods資料有業務日誌資料(log)和業務db資料(db)兩類。對於業務db資料來說,從mysql等關係型資料庫的業務資料進行採集,然後匯入到hive中,是進行資料倉儲生產的重要環節。

如何準確、高效地把mysql資料同步到hive中?一般常用的解決方案是批量取數並load:直連mysql去select表中的資料,然後存到本地檔案作為中間儲存,最後把檔案load到hive表中。這種方案的優點是實現簡單,但是隨著業務的發展,缺點也逐漸暴露出來:

為了徹底解決這些問題,我們逐步轉向cdc (change data capture) + merge的技術方案,即實時binlog採集 + 離線處理binlog還原業務資料這樣一套解決方案。binlog是mysql的二進位制日誌,記錄了mysql中發生的所有資料變更,mysql集群自身的主從同步就是基於binlog做的。

本文主要從binlog實時採集和離線處理binlog還原業務資料兩個方面,來介紹如何實現db資料準確、高效地進入數倉。

整體架構

整體的架構如上圖所示。在binlog實時採集方面,我們採用了阿里巴巴的開源專案canal,負責從mysql實時拉取binlog並完成適當解析。binlog採集後會暫存到kafka上供下游消費。整體實時採集部分如圖中紅色箭頭所示。

離線處理binlog的部分,如圖中黑色箭頭所示,通過下面的步驟在hive上還原一張mysql表:

我們回過頭來看看,背景中介紹的批量取數並load方案遇到的各種問題,為什麼用這種方案能解決上面的問題呢?

binlog實時採集

對binlog的實時採集包含兩個主要模組:一是canalmanager,主要負責採集任務的分配、監控報警、元資料管理以及和外部依賴系統的對接;二是真正執行採集任務的canal和canalclient。

當使用者提交某個db的binlog採集請求時,canalmanager首先會呼叫dba平台的相關介面,獲取這一db所在mysql例項的相關資訊,目的是從中選出最適合binlog採集的機器。然後把採集例項(canal instance)分發到合適的canal伺服器上,即canalserver上。在選擇具體的canalserver時,canalmanager會考慮負載均衡、跨機房傳輸等因素,優先選擇負載較低且同地域傳輸的機器。

canalserver收到採集請求後,會在zookeeper上對收集資訊進行註冊。註冊的內容包括:

這樣做的目的有兩個:

對binlog的訂閱以mysql的db為粒度,乙個db的binlog對應了乙個kafka topic。底層實現時,乙個mysql例項下所有訂閱的db,都由同乙個canal instance進行處理。這是因為binlog的產生是以mysql例項為粒度的。canalserver會拋棄掉未訂閱的binlog資料,然後canalclient將接收到的binlog按db粒度分發到kafka上。

離線還原mysql資料

完成binlog採集後,下一步就是利用binlog來還原業務資料。首先要解決的第乙個問題是把binlog從kafka同步到hive上。

kafka2hive

整個kafka2hive任務的管理,在美團資料平台的etl框架下進行,包括任務原語的表達和排程機制等,都同其他etl類似。而底層採用linkedin的開源專案camus,並進行了有針對性的二次開發,來完成真正的kafka2hive資料傳輸工作。

對camus的二次開發

kafka上儲存的binlog未帶schema,而hive表必須有schema,並且其分割槽、欄位等的設計,都要便於下游的高效消費。對camus做的第乙個改造,便是將kafka上的binlog解析成符合目標schema的格式。

對camus做的第二個改造,由美團的etl框架所決定。在我們的任務排程系統中,目前只對同排程佇列的任務做上下游依賴關係的解析,跨排程佇列是不能建立依賴關係的。而在mysql2hive的整個流程中,kafka2hive的任務需要每小時執行一次(小時佇列),merge任務每天執行一次(天佇列)。而merge任務的啟動必須要嚴格依賴小時kafka2hive任務的完成。

為了解決這一問題,我們引入了checkdone任務。checkdone任務是天任務,主要負責檢測前一天的kafka2hive是否成功完成。如果成功完成了,則checkdone任務執行成功,這樣下游的merge任務就可以正確啟動了。

checkdone的檢測邏輯

checkdone是怎樣檢測的呢?每個kafka2hive任務成功完成資料傳輸後,由camus負責在相應的hdfs目錄下記錄該任務的啟動時間。checkdone會掃瞄前一天的所有時間戳,如果最大的時間戳已經超過了0點,就說明前一天的kafka2hive任務都成功完成了,這樣checkdone就完成了檢測。

此外,由於camus本身只是完成了讀kafka然後寫hdfs檔案的過程,還必須完成對hive分割槽的載入才能使下游查詢到。因此,整個kafka2hive任務的最後一步是載入hive分割槽。這樣,整個任務才算成功執行。

每個kafka2hive任務負責讀取乙個特定的topic,把binlog資料寫入original_binlog庫下的一張表中,即前面圖中的original_binlog.db,其中儲存的是對應到乙個mysql db的全部binlog。

上圖說明了乙個kafka2hive完成後,檔案在hdfs上的目錄結構。假如乙個mysql db叫做user,對應的binlog儲存在original_binlog.user表中。ready目錄中,按天儲存了當天所有成功執行的kafka2hive任務的啟動時間,供checkdone使用。每張表的binlog,被組織到乙個分割槽中,例如userinfo表的binlog,儲存在table_name=userinfo這一分割槽中。每個table_name一級分割槽下,按dt組織二級分割槽。圖中的***.lzo和***.lzo.index檔案,儲存的是經過lzo壓縮的binlog資料。

merge

binlog成功入倉後,下一步要做的就是基於binlog對mysql資料進行還原。merge流程做了兩件事,首先把當天生成的binlog資料存放到delta表中,然後和已有的存量資料做乙個基於主鍵的merge。delta表中的資料是當天的最新資料,當一條資料在一天內發生多次變更時,delta表中只儲存最後一次變更後的資料。

把delta資料和存量資料進行merge的過程中,需要有唯一鍵來判定是否是同一條資料。如果同一條資料既出現在存量表中,又出現在delta表中,說明這一條資料發生了更新,則選取delta表的資料作為最終結果;否則說明沒有發生任何變動,保留原來存量表中的資料作為最終結果。merge的結果資料會insert overwrite到原表中,即圖中的origindb.table。

merge流程舉例

下面用乙個例子來具體說明merge的流程。

資料表共id、value兩列,其中id是主鍵。在提取delta資料時,對同一條資料的多次更新,只選擇最後更新的一條。所以對id=1的資料,delta表中記錄最後一條更新後的值value=120。delta資料和存量資料做merge後,最終結果中,新插入一條資料(id=4),兩條資料發生了更新(id=1和id=2),一條資料未變(id=3)。

預設情況下,我們採用mysql表的主鍵作為這一判重的唯一鍵,業務也可以根據實際情況配置不同於mysql的唯一鍵。

上面介紹了基於binlog的資料採集和ods資料還原的整體架構。下面主要從兩個方面介紹我們解決的實際業務問題。

實踐一:分庫分表的支援

隨著業務規模的擴大,mysql的分庫分表情況越來越多,很多業務的分表數目都在幾千個這樣的量級。而一般資料開發同學需要把這些資料聚合到一起進行分析。如果對每個分表都進行手動同步,再在hive上進行聚合,這個成本很難被我們接受。因此,我們需要在ods層就完成分表的聚合。

首先,在binlog實時採集時,我們支援把不同db的binlog寫入到同乙個kafka topic。使用者可以在申請binlog採集時,同時勾選同乙個業務邏輯下的多個物理db。通過在binlog採集層的匯集,所有分庫的binlog會寫入到同一張hive表中,這樣下游在進行merge時,依然只需要讀取一張hive表。

第二,merge任務的配置支援正則匹配。通過配置符合業務分表命名規則的正規表示式,merge任務就能了解自己需要聚合哪些mysql表的binlog,從而選取相應分割槽的資料來執行。

這樣通過兩個層面的工作,就完成了分庫分表在ods層的合併。

這裡面有乙個技術上的優化,在進行kafka2hive時,我們按業務分表規則對錶名進行了處理,把物理表名轉換成了邏輯表名。例如userinfo123這張表名會被轉換為userinfo,其binlog資料儲存在original_binlog.user表的table_name=userinfo分割槽中。這樣做的目的是防止過多的hdfs小檔案和hive分割槽造成的底層壓力。

實踐二:刪除事件的支援

delete操作在mysql中非常常見,由於hive不支援delete,如果想把mysql中刪除的資料在hive中刪掉,需要採用「迂迴」的方式進行。

對需要處理delete事件的merge流程,採用如下兩個步驟:

展望

作為資料倉儲生產的基礎,美團資料平台提供的基於binlog的mysql2hive服務,基本覆蓋了美團內部的各個業務線,目前已經能夠滿足絕大部分業務的資料同步需求,實現db資料準確、高效地入倉。在後面的發展中,我們會集中解決canalmanager的單點問題,並構建跨機房容災的架構,從而更加穩定地支撐業務的發展。

總結

美團資料倉儲 資料脫敏

在資料倉儲建設過程中,資料安全扮演著重要角色,因為隱私或敏感資料的洩露,會對資料主體 客戶,員工和公司 的財產 名譽 人身安全 以及合法利益造成嚴重損害。因此我們需要嚴格控制對倉庫中的資料訪問,即什麼樣的人員或者需求才可以訪問到相關的資料。這就要求對資料本身的敏感程度進行安全級別劃分。資料有了安全等...

美團資料倉儲的演進

shdiao 2013 12 05 20 44 美團資料倉儲,在過去的兩年中,與我們的業務一起高速發展。在這一演進過程中,有很多值得總結和沉澱的內容。這篇文件回顧下美團資料倉儲這兩年發展過程中遇到的各種問題,為什麼選擇了現在的技術方案,每乙個功能和模組是在什麼情況下產生的,解決的是什麼問題,中間有過...

美團資料倉儲的演進

美團資料倉儲,在過去的兩年中,與我們的業務一起高速發展。在這一演進過程中,有很多值得總結和沉澱的內容。這篇文件回顧下美團資料倉儲這兩年發展過程中遇到的各種問題,為什麼選擇了現在的技術方案,每乙個功能和模組是在什麼情況下產生的,解決的是什麼問題,中間有過哪些彎路。既可以作為大家熟悉美團資料倉儲構建過程...