目錄
- 1、分布式事務
- 2、SAGA
- 3、SAGA 實踐
- 4、處理網絡異常
- 5、處理回滾
- 6、小結

銀行跨行轉賬業務是一個典型分布式事務場景,假設 A 需要跨行轉賬給 B,那么就涉及兩個銀行的數據,無法通過一個數據庫的本地事務保證轉賬的 ACID,只能夠通過分布式事務來解決。
1、分布式事務
分布式事務在分布式環境下,為了滿足可用性、性能與降級服務的需要,降低一致性與隔離性的要求,一方面遵循 BASE 理論:
- 基本業務可用性( Basic Availability )
- 柔性狀態( Soft state )
- 最終一致性( Eventual consistency )
另一方面,分布式事務也部分遵循 ACID 規范:
- 原子性:嚴格遵循
- 一致性:事務完成后的一致性嚴格遵循;事務中的一致性可適當放寬
- 隔離性:并行事務間不可影響;事務中間結果可見性允許安全放寬
- 持久性:嚴格遵循
2、SAGA
Saga 是這一篇數據庫論文SAGAS提到的一個分布式事務方案。其核心思想是將長事務拆分為多個本地短事務,由 Saga 事務協調器協調,如果各個本地事務成功完成那就正常完成,如果某個步驟失敗,則根據相反順序一次調用補償操作。
目前可用于 SAGA 的開源框架,主要為 Java 語言,其中以 seata 為代表。我們的例子采用 go 語言,使用的分布式事務框架為https://github.com/yedf/dtm,它對分布式事務的支持非常優雅。下面來詳細講解 SAGA 的組成:
DTM 事務框架里,有 3 個角色,與經典的 XA 分布式事務一樣:
- AP/應用程序,發起全局事務,定義全局事務包含哪些事務分支
- RM/資源管理器,負責分支事務各項資源的管理
- TM/事務管理器,負責協調全局事務的正確執行,包括 SAGA 正向 /逆向操作的執行
下面看一個成功完成的 SAGA 時序圖,就很容易理解 SAGA 分布式事務:

3、SAGA 實踐
對于我們要進行的銀行轉賬的例子,我們將在正向操作中,進行轉入轉出,在補償操作中,做相反的調整。
首先我們創建賬戶余額表:
CREATE TABLE dtm_busi.`user_account` (
`id` int(11) AUTO_INCREMENT PRIMARY KEY,
`user_id` int(11) not NULL UNIQUE ,
`balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`create_time` datetime DEFAULT now(),
`update_time` datetime DEFAULT now()
);
我們先編寫核心業務代碼,調整用戶的賬戶余額
def saga_adjust_balance(cursor, uid, amount):
affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount))
if affected == 0:
raise Exception("update error, balance not enough")
下面我們來編寫具體的正向操作 /補償操作的處理函數
@app.post("/api/TransOutSaga")
def trans_out_saga():
saga_adjust_balance(c, out_uid, -30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransOutCompensate")
def trans_out_compensate():
saga_adjust_balance(c, out_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInSaga")
def trans_in_saga():
saga_adjust_balance(c, in_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInCompensate")
def trans_in_compensate():
saga_adjust_balance(c, in_uid, -30)
return {"dtm_result": "SUCCESS"}
到此各個子事務的處理函數已經 OK 了,然后是開啟 SAGA 事務,進行分支調用
# 這是 dtm 服務地址
dtm = "http://localhost:8080/api/dtmsvr"
# 這是業務微服務地址
svc = "http://localhost:5000/api"
req = {"amount": 30}
s = saga.Saga(dtm, utils.gen_gid(dtm))
s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate")
s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate")
s.submit()
至此,一個完整的 SAGA 分布式事務編寫完成。
如果您想要完整運行一個成功的示例,那么參考這個例子yedf/dtmcli-py-sample,將它運行起來非常簡單
# 部署啟動 dtm
# 需要 docker 版本 18 以上
git clone https://github.com/yedf/dtm
cd dtm
docker-compose up
# 另起一個命令行
git clone https://github.com/yedf/dtmcli-py-sample
cd dtmcli-py-sample
pip3 install flask dtmcli requests
flask run
# 另起一個命令行
curl localhost:5000/api/fireSaga
4、處理網絡異常
假設提交給 dtm 的事務中,調用轉入操作時,出現短暫的故障怎么辦?按照 SAGA 事務的協議,dtm 會重試未完成的操作,這時我們要如何處理?故障有可能是轉入操作完成后出網絡故障,也有可能是轉入操作完成中出現機器宕機。如何處理才能夠保障賬戶余額的調整是正確無問題的?
這類網絡異常的妥當處理,是分布式事務中的大難題,異常情況包括三類:重復請求、空補償、懸掛,都需要正確處理
DTM 提供了子事務屏障功能,保證上述異常情況下的業務邏輯,只會有一次正確順序下的成功提交。(子事務屏障詳情參考分布式事務最經典的七種解決方案的子事務屏障環節)
我們把處理函數調整為:
@app.post("/api/TransOutSaga")
def trans_out_saga():
with barrier.AutoCursor(conn_new()) as cursor:
def busi_callback(c):
saga_adjust_balance(c, out_uid, -30)
barrier_from_req(request).call(cursor, busi_callback)
return {"dtm_result": "SUCCESS"}
這里的 barrier_from_req(request).call(cursor, busi_callback)調用會使用子事務屏障技術,保證 busi_callback 回調函數僅被提交一次
您可以嘗試多次調用這個 TransIn 服務,僅有一次余額調整。
5、處理回滾
假如銀行將金額準備轉入用戶 2 時,發現用戶 2 的賬戶異常,返回失敗,會怎么樣?我們調整處理函數,讓轉入操作返回失敗
@app.post("/api/TransInSaga")
def trans_in_saga():
return {"dtm_result": "FAILURE"}
我們給出事務失敗交互的時序圖:

這里有一點,TransIn 的正向操作什么都沒有做,就返回了失敗,此時調用 TransIn 的補償操作,會不會導致反向調整出錯了呢?
不用擔心,前面的子事務屏障技術,能夠保證 TransIn 的錯誤如果發生在提交之前,則補償為空操作;TransIn 的錯誤如果發生在提交之后,則補償操作會將數據提交一次。
我們可以將返回錯誤的 TransIn 改成:
@app.post("/api/TransInSaga")
def trans_in_saga():
with barrier.AutoCursor(conn_new()) as cursor:
def busi_callback(c):
saga_adjust_balance(c, in_uid, 30)
barrier_from_req(request).call(cursor, busi_callback)
return {"dtm_result": "FAILURE"}
最后的結果余額依舊會是對的,原理可以參考:分布式事務最經典的七種解決方案的子事務屏障環節
6、小結
在這篇文章里,我們介紹了 SAGA 的理論知識,也通過一個例子,完整給出了編寫一個 SAGA 事務的過程,涵蓋了正常成功完成,異常情況,以及成功回滾的情況。相信讀者通過這邊文章,對 SAGA 已經有了深入的理解。
文中使用的 dtm 是新開源的 Golang 分布式事務管理框架,功能強大,支持 TCC 、SAGA 、XA 、事務消息等事務模式,支持 Go 、python 、PHP 、node 、csharp 等語言的。同時提供了非常簡單易用的接口。
以上就是利用 Python 輕松完成一個 Saga 分布式事務的詳細內容,更多關于Python完成一個 Saga 分布式事務的資料請關注腳本之家其它相關文章!
您可能感興趣的文章:- 帶你用Python實現Saga 分布式事務的方法
- 詳解分布式系統中如何用python實現Paxos
- Python搭建Spark分布式集群環境
- python django框架中使用FastDFS分布式文件系統的安裝方法
- Python多進程入門、分布式進程數據共享實例詳解
- Python分布式進程中你會遇到的問題解析