如何基於sqlite實現kafka延時訊息詳解

2022-09-20 18:18:08 字數 3783 閱讀 7980

目錄

延時訊息(或者說定時訊息)是業務系統裡乙個常見的功能點。常用業務場景如:

1) 訂單超時取消

2) 離線超過指定時間的使用者,召回通知

3) 手機消失多久後通知監護人……

現流行的實現方案主要有:

1)資料庫定時輪詢,掃瞄到達到延時時間的記錄,業務處理,刪除該記錄

2)jdk 自帶延時佇列(delayqueue),或優化的時間輪演算法

3)redis 有序集合

4)支援延時訊息的分布式訊息佇列

但以上方案,都存在各種缺陷:

1)定時輪詢間隔小,則對資料庫造成很大壓力,分布式微服務架構不好適配。

2)jdk 自帶延時佇列,占用記憶體高,服務重啟則丟失訊息,分布式微服務架構不好適配。

3)redis 有序集合比較合適,但記憶體貴,分布式微服務架構不好適配。

4)現在主流的 rocketmq 不支援任意延時時間的延時訊息,rabbitmq或activemq 效能不夠好,傳送配置麻煩,kafka不支援延時訊息。

因此,我想實現乙個適配分布式微服務架構、高效能、方便業務系統使用的延時訊息**中介軟體。

要保證高效能,推薦使用 kafka 或者 rocketmq 做分布式訊息佇列。當前是基於 sqlite 實現 kafka 延時訊息。

當前實現思路是基於kafka的,實際適用於任意mq產品。

1)業務系統先推送延時訊息到統一延時訊息佇列

2)定時讀取延時訊息佇列的延時訊息,儲存於本地,提交偏移量

3)定時掃瞄本地到達延時期限的訊息,**到實際業務訊息佇列

4)刪除本地延時訊息

1)乙個業務處理流程使用乙個sqlite資料庫檔案,可併發執行提高效能。

2)使用雪花演算法生成 id 。

3)沒有延時訊息時,執行緒休眠一定時間,減低kafka集群、和本地io壓力。

4)本地儲存使用 sqlite。

1)kafka-client

2)sqlite

3)slf4j+log4j2

4)jackson

測試機器: i5-6500,16gb記憶體,機械硬碟

延時訊息大小: 1kb

併發處理數:1

已本地簡單測試,效能表現:

1) 1個併發處理數就可以達到1秒儲存、**、刪除 約15000條延時訊息,2 個可以達到 30000條/s ……

2) 一次性處理1萬條記錄,是經過多次對比試驗得出的合適批次大小

也測試了其它兩個本地儲存方案的效能:

1)直接存讀 json 檔案,讀寫效能太差(約1200條記錄/s,慢在頻繁建立、開啟、關閉檔案,隨機磁碟io);

程式設計客棧2)rocksdb 存讀,寫入效能非常好(97000條記錄/s),但篩選到期延時訊息效能太差了,在資料量大於100w時,表現不如 sqlite,而且執行時占用記憶體、cpu 資源非常高。

1)jdk 1.8

2)kafka 1.1.0

可以自行替換為符合實際kafka版本的jar包(不會有衝突的,jar包版本和kafka服務版本不一致可能會有異常[無法拉取訊息、提交失敗等])。

可修改pom.xml內的 kafka_version

1.1.0

重新打包即可。當前程式可以獨立部署,對現有工程專案無侵入性。

1)在專案根目錄執行 m**en 打包後,會生成 dev_ops 檔案

2)在 dev_ops 目錄下執行 j**a -jar kafka_delay_sqlite-20220102.jar 即可啟動程式

3)如需修改配置,可在dev_ops目錄內建立kafka.properties檔案,設定自定義配置

預設配置如下:

# kafka 連線url [ip:port,ip:port……]

kafka.url=127.0.0.1:9092

# 延時訊息本地儲存路徑,建議使用絕對值

kafka.delay.store.path=/data/kafka_delay

# 統一延時訊息topic

kafka.delay.topic=common_de程式設計客棧lay_msg

# 消費者組id

kafka.delay.group.id=common_delay_app

# 併發處理數。限制條件: workers 小於等於topic分割槽數www.cppcns.com

kafka.delay.workers=2

4)業務方傳送 kafka 訊息到 topic (common_delay_msg)

訊息體引數說明:

delaytime: 指定延時時限,秒級別時間戳

訊息體案例:

","delaytime": 1641470704

}複製 延時訊息儲存目錄 到新機器,重啟部署、啟動程式即可。(該配置項所在目錄 kafka.delay.store.path=/data/kafka_delay)

日誌預設輸出到 /logs/kafka_delay/ ,日誌輸出方式為非同步輸出。

system.log 記錄了系統 info 級別以上的日誌,info級別日誌不是立刻輸出的,所以程式重啟時,可能會丟失部分日誌

exception.log 記錄了系統 warn 級別以上的日誌,日誌配置為立即輸出,程式正常重啟,不會丟失日誌,重點關注這個日誌即可。

如需自定義日誌配置,可以在 log4j2.xml 進行配置。

如果要進行本地除錯,可以解開注釋,否則控制台沒有日誌輸出:

1) 由於設定了執行緒空閒時休眠機制,延時訊息最大可能會推遲8秒鐘傳送。

如果覺得延遲時間比較大,可以自行修改原始碼的配置,重新打包即可。

kafkautils.subscribe()

msgtransfertask.run()

2) 當前程式嚴格依賴於系統時鐘,注意配置程式部署伺服器的時鐘和業務伺服器時鐘一致

3) 建議配置統一延時訊息佇列(common_delay_msg)的分割槽數為 2 的倍數

4) 每個 kafka.delay.workers 約需要 200 mb 記憶體,預設配置為2 , jvm 建議配置 1 gb 以上記憶體,避免頻繁gc 。

workers 增大後,不要再減小,否則會導致部分 sqlite 資料庫沒有執行緒訪問,訊息丟失。

併發處理數越大,延時訊息處理效率越高,但需要注意不要大於topic的分割槽數。

需要自行測試多少個併發處理數就會達到磁碟io、網路頻寬上限。

當前程式主要瓶頸在於磁碟io和網路頻寬,實際記憶體和cpu資源占用極低。

5) 程式執行時,不要操作延時訊息儲存目錄即裡面的檔案

6) 當前配置為正常情況下不會拋棄訊息模式,但程式重啟時,存在重**送訊息的可能,下游業務系統需要做好冪等性處理。

如果kafka集群異常,當前配置為重新傳送16次,如果仍不能恢復過來,則拋棄當前訊息,實際生產環境裡,基本不可能出現該場景。

如果確定訊息不能拋棄,需要自行修改原始碼(msgtransfertask.run,kafkautils.send(……)),重新打包、部署。

7) 程式出現未知異常(sqlite被手動修改、磁碟滿了……),會直接結束程式執行。

整體思路,實現,原始碼裡都比較清晰,如果 rocketmq 也有自定義延時需求,參考著修改原始碼即可,實現邏輯是一樣的。

如果要盡可能的實現延時訊息的最終處理,可以再額外採用2個延遲訊息處理方案:

1、每天掃瞄一次資料庫,把符合延時條件的記錄統一處理一次

2、惰性處理,當使用者再次訪問某功能點時,再修改相關符合延時條件的記錄

作者郵箱:[email protected] ,如有問題,歡迎騷擾。也歡迎大家**談論,qq群: 777804773

原始碼路徑:

https://gitee程式設計客棧.com/yushengruohui/delay_message

sqlite實現概覽

最近可能會用到sqlite,大致瀏覽了下實現的相關的細節。在這裡總結備份下。有錯誤的地方希望可以不吝指教。官網介紹,sqlite實現了可序列化的隔離級別 sqlite使用btree實現 sqlite寫入不能併發 sqlite使用的是db級別的鎖定 實現部分。關聯式資料庫要求是什麼,要怎麼實現,某些模...

QT下如何實現SQLite動態建立表

網上有很多人問sqlite動態建立多個表的問題,但幾乎沒有人給出滿意的答案,事實上在qt環境下利用類qstring很簡單就能做到 1.建立乙個儲存表名的qstring 變數 qstring name table 2.建立乙個儲存sqlite建立 語句的qstring變數 qstring add qs...

如何基於windows實現python定時爬蟲

windows系統下使用任務計畫程式,linux下可以使用crontab命令新增自啟動計畫。這裡寫windows 10zljahrowr windows server 2016系統的設定方法。首先編寫乙個.bat指令碼。新建乙個txt,將下面三行 複製進去,main.py改成自己程式名字。保程式設計...