在分布式系統中,為了讓每個節點都能夠感知到其他節點的事務執行狀況,需要引入乙個中心節點來統一處理所有節點的執行邏輯,這個中心節點叫做協調者(coordinator),被中心節點排程的其他業務節點叫做參與者(participant)。
接下來正式介紹2pc。顧名思義,2pc將分布式事務分成了兩個階段,兩個階段分別為提交請求(投票)和提交(執行)。協調者根據參與者的響應來決定是否需要真正地執行事務,具體流程如下。
提交請求(投票)階段協調者向所有參與者傳送prepare請求與事務內容,詢問是否可以準備事務提交,並等待參與者的響應。
參與者執行事務中包含的操作,並記錄undo日誌(用於回滾)和redo日誌(用於重放),但不真正提交。
參與者向協調者返回事務操作的執行結果,執行成功返回yes,否則返回no。
提交(執行)階段
分為成功與失敗兩種情況。
若所有參與者都返回yes,說明事務可以提交:協調者向所有參與者傳送commit請求。
參與者收到commit請求後,將事務真正地提交上去,並釋放占用的事務資源,並向協調者返回ack。
協調者收到所有參與者的ack訊息,事務成功完成。
若有參與者返回no或者超時未返回,說明事務中斷,需要回滾:協調者向所有參與者傳送rollback請求。
參與者收到rollback請求後,根據undo日誌回滾到事務執行前的狀態,釋放占用的事務資源,並向協調者返回ack。
協調者收到所有參與者的ack訊息,事務回滾完成。
下圖分別示出這兩種情況:
提交成功
提交失敗
flink基於2pc的事務性寫入
flink提供了基於2pc的sinkfunction,名為twophasecommitsinkfunction,幫助我們做了一些基礎的工作。它的第一層類繼承關係如下:
但是twophasecommitsinkfunction仍然留了以下四個抽象方法待子類來實現:
protected abstract txn begintransaction() throws exception;
protected abstract void precommit(txn transaction) throws exception;
protected abstract void commit(txn transaction);
protected abstract void abort(txn transaction);
begintransaction():開始乙個事務,返回事務資訊的控制代碼。
precommit():預提交(即提交請求)階段的邏輯。
commit():正式提交階段的邏輯。
abort():取消事務。
下面以flink與kafka的整合來說明2pc的具體流程。注意這裡的kafka版本必須是0.11及以上,因為只有0.11+的版本才支援冪等producer以及事務性,從而2pc才有存在的意義。
開始事務
看下flinkkafkaproducer011類實現的begintransaction()方法:
@override
protected kafkatransactionstate begintransaction() throws flinkkafka011exception - checkpoint {} triggered, flushing transaction '{}'", name(), context.getcheckpointid(), currenttransactionholder);
precommit(currenttransactionholder.handle);
pendingcommittransactions.put(checkpointid, currenttransactionholder);
log.debug("{} - stored pending transactions {}", name(), pendingcommittransactions);
currenttransactionholder = begintransactioninternal();
log.debug("{} - started new transaction '{}'", name(), currenttransactionholder);
state.clear();
state.add(new state<>(
this.currenttransactionholder,
new arraylist<>(pendingcommittransactions.values()),
usercontext));
結合flink檢查點的原理,可以用下圖來形象地表示預提交階段的流程:
每當需要做checkpoint時,jobmanager就在資料流中打入乙個屏障(barrier),作為檢查點的界限。屏障隨著運算元鏈向下游傳遞,每到達乙個運算元都會觸發將狀態快照寫入狀態後端(state backend)的動作。當屏障到達kafka sink後,觸發precommit(實際上是kafkaproducer.flush())方法刷寫訊息資料,但還未真正提交。接下來還是需要通過檢查點來觸發提交階段。
提交階段
flinkkafkaproducer011.commit()方法實際上是**了kafkaproducer.committransaction()方法,正式向kafka提交事務。
@override
protected void commit(kafkatransactionstate transaction) finally - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(), checkpointid, pendingtransaction, pendingtransactioncheckpointid);
logwarningiftimeoutalmostreached(pendingtransaction);
try catch (throwable t) - committed checkpoint transaction {}", name(), pendingtransaction);
pendingtransactioniterator.remove();
if (firsterror != null) {
throw new flinkruntimeexception("committing one of transactions failed, logging first encountered failure",
firsterror);
該方法每次從正在等待提交的事務控制代碼中取出乙個,校驗它的檢查點id,並呼叫commit()方法提交之。這階段的流程可以用下圖來表示:
可見,只有在所有檢查點都成功完成這個前提下,寫入才會成功。這符合前文所述2pc的流程,其中jobmanager為協調者,各個運算元為參與者(不過只有sink乙個參與者會執行提交)。一旦有檢查點失敗,notifycheckpointcomplete()方法就不會執行。如果重試也不成功的話,最終會呼叫abort()方法回滾事務。
@override
protected void abort(kafkatransactionstate transaction) {
if (transaction.istransactional()) {
transaction.producer.aborttransaction();
recycletransactionalproducer(transaction.producer);
2pc的缺點
1、協調者存在單點問題。如果協調者掛了,整個2pc邏輯就徹底不能執行。
2、執行過程是完全同步的。各參與者在等待其他參與者響應的過程中都處於阻塞狀態,大併發下有效能問題。
3、仍然存在不一致風險。如果由於網路異常等意外導致只有部分參與者收到了commit請求,就會造成部分參與者提交了事務而其他參與者未提交的情況。
兩段式提交
通過使用某種協議進行通訊來完成分布式事務,被稱為兩段式提交。從名字上看,您可能已經知道有兩個階段 第乙個階段,即預提交 o 事務協調器給每個事務管理器傳送準備操作的訊號。o 事務管理器將操作 通常是資料更新 步驟 或細節 寫入事務日誌。如果失敗,事務管理器使用這些步驟重複操作。o 事務管理器本地建立...
Swift中類的兩段式構造 類的構造過程
import foundation 類的構造需要兩個階段 第一階段 1.程式呼叫子類的某個構造器 2.為例項分配記憶體,此時例項的記憶體還沒有被初始化 3.指定構造器確保子類定義的所有例項儲存屬性都已被賦初值 4.指定構造器將呼叫父類的構造器,完成父類定義的例項儲存屬性的初始化 5.沿著呼叫父類構造...
徹底搞懂狀態機(一段式 兩段式 三段式)
例項 fsm實現10010串的檢測 狀態轉移圖 初始狀態s0,a 0,z 0.如果檢測到1,跳轉到s1。下一狀態s1,a 1,z 0.如果檢測到0,跳轉到s2。下一狀態s2,a 0,z 0.如果檢測到0,跳轉到s3。下一狀態s3,a 0,z 0.如果檢測到1,跳轉到s4。下一狀態s4,a 1,z 0...