資料同步Debezium的應用

2021-10-18 16:53:36 字數 1790 閱讀 9288

背景

近期公司在響應國家軟體規範要求,重點專案的資料庫採用開源版postgresql(下文簡稱pg),業務端依然使用oracle。因此為了保證pg到oracle的資料同步問題,經過調研採用debezium開源專案,實現 pg -> kafka-> oracle 的資料實時同步。

開始介紹重點…

debuzium簡介

debezium是apache kafka connect的一組源聯結器(source端),通過款實現剖析資料庫中binlog日誌,捕獲資料,拷貝到kafka中,實現資料的實時抽取。另外可以通過confluen外掛程式或者自建專案對kafka資料消費,通過解析資料轉為為sql,最終實現資料的實時同步。我個人採用後者實現

一、source端實現方式

source端是pg -> kafka的資料傳輸。本人採用基於docker安裝debezium,安裝步驟如下

docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
docker run -d  --name kafka -p 9092:9092 -e kafka_advertised_listeners=plaintext: -e kafka_listeners=plaintext: --link zookeeper:zookeeper debezium/kafka
docker run -d  --name database -p 5433:5432 -e postgres_password=debezium  -d debezium/example-postgres
docker run -d --name connect -p 8083:8083 -e group_id=1 -e config_storage_topic=my_connect_configs -e offset_storage_topic=my_connect_offsets -e status_storage_topic=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link database:database debezium/connect
註冊聯結器,使用post新增,get檢視

post:

}

檢視當前inventory-connector聯結器資訊

192.168.60.128:8083/connectors/inventory-connector

192.168.60.128:8083/connectors
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka debezium/kafka watch-topic -a -k postgres.inventory.test2

其中postgres.inventory.test2是kafka的topic

postgres是資料庫名稱,inventory是schema,test2是資料表名稱

二、sink端實現方式

實現簡要描述:通過本地搭建springboot專案,配置kafka服務,對kafka topic的資料監聽,實現資料的實時消費(採用批量消費,解析資料,並採用多執行緒池方式將資料同步到oracle中),具體**請參考:

debezium 資料變更工具使用

1.作用 簡單概述就是cdc change data capture 實時資料分析領域用的比較多 2.簡單使用 基於官網的docker 說明 備註 測試沒有使用守護程序模式為了方便測試 a.zookeeper docker run it rm name zookeeper p 2181 2181 p...

應用 執行緒同步

正式開始之前先簡單的解釋捋捋幾對概念 a.阻塞 阻塞呼叫是指呼叫結果返回之前,當前執行緒會被掛起。函式只有在得到結果之後才會返回 有人也許會把阻塞呼叫和同步呼叫等同起來,實際上他是不同的。對於同步呼叫來說,很多時候當前執行緒還是啟用的,只是從邏輯上當前函式沒有返回而已。socket接收資料函式rec...

hashmap同步應用

在多執行緒應用程式中對於hashmap,則需要額外的同步機制。hashmap的同步問題可通過collections的乙個靜態方法得到解決 map collections.synchronizedmap map m 這個方法返回乙個同步的map,這個map封裝了底層的hashmap的所有方法,使得底層...