前段時間重做了乙個資料同步方面的工作,具體是把相關資料從乙個遠端複製到本地端,需要做到不重、不漏、無誤的同時保持使用簡單方便。
我們跟第三方合作,可以讀取使用第三方的資料來源,但不能修改,而第三方的資料來源沒有我們想要的索引和主鍵約束,所以想要更方便地使用第三方資料必須把資料同步傳輸到本地,根據自己的需求加索引加主鍵約束等等。第三方資料庫中的資料會增加,修改,但不被會刪除。
根據資料來源的特點,把我們需要的字段同步過來,根據資料特性加上主鍵約束來確保資料的唯一性,更新操作通過主鍵來做對應。
每次同步都只取前一天更新的源資料進行更新操作。
經過不斷地踩坑後發現,原來第三方的資料來源不止是會增加,還會進行修改,不止是普通欄位的修改,我們自建的主鍵中的值也會修改,雖然發生的頻率很小,由於資料特性,不會發生主鍵衝突,但主鍵都改了,兩份資料就沒法對應了,這就導致了會多出一些「錯誤」資料。
利用cron做的定時任務,一旦某次資料衝突或者發生硬體故障沒有更新,那麼在解決了故障之後必須手動的傳入引數把前n天的資料都做一遍更新。
原來的方案資料是一條一條的查詢的,效率十分低下。
所以很有必要重新做乙個同步方案。
經過觀察,第三方資料庫中每一張表都有且僅有乙個唯一主鍵的id,此外還有資料更新時間op_date。所以我們只能把唯一主鍵也同步過來。
這種方案,資料正確性可以保證,但是其他方面還是不盡如人意。而且,假如說某乙個天的資料量特別巨大的話,會不會發生記憶體不夠?既然選擇重做,那就做好,把一切有可能發生的問題都避免掉。所以可以由一次性同步改為分批次同步。這是乙個小的優化點。
其實也還有其他的問題,假如更新了一般發生了意外停止了更新而又沒有被監控到怎麼辦?
這個時候老大看了我的設計方案,我們思索討論了一下,有了乙個新的思路。
這裡引入了兩個新名詞:全量更新和差量更新。
全量更新是從零開始全部更新,差量更新是從上次更新的地方接著更新。差量更新的好處在於,及時上次因為某種意外斷掉了,也不會影響下一次更新,並且下一次更新會把上一次未更新的資料也更新了。
最後成型的具體方案是:
獲取local庫相應表中最新的資料,以op_date和id倒序排列。記最新資料的op_date和id分別為l_op_date, l_id。
獲取origin庫相應表中,op_date 等於 l_op_date的資料進行同步,一次最多取max_count個,直到取完。
獲取origin庫響應表中,op_date 大於 l_op_date、id 大於 l_id的資料,以op_date和id倒序排列,最多取max_count個,同步更新並記 l_op_date、l_id為最新條資料相應的op_date和l_id。
重複2-3,直到取完所有資料。
class
basetransfer
(object):
max_count = 2000
defrun
(): count = 0
exec_count = 0
for origin_data in self.get_origin_data():
local_data_dict =
for item in origin_data:
if item.is_broken():
continue
# 特殊的檢測
ifnot self.special_check(item):
continue
mn = mass_dict.get(item.id)
if mn:
# 更新
if self.update(item, mn):
count += 1
else:
# 插入
self.add(item)
count += 1
if count - exec_count >= self.max_count:
exec_count = count
db.session.commit()
db.session.commit()
defget_origin_data
(self):
# 獲取op_date和wind_id
local_data = self.get_lastest_record()
# 第一次匯入全部資料的時候需要考慮沒有資料的情況
ifnot local_data:
l_op_date = datetime(1980, 1, 1)
l_id = ''
else:
l_op_date = local_data.op_date
l_id = local_data.id
while
true:
# 先做op_date等量查詢
while
true:
origin_data = 等量查詢
ifnot origin_data:
break
l_id = origin_data[-1].id
yield origin_data
# 大於當前op_date
origin_data = origin_model.query.filter(
origin_model.op_date > l_op_date
).order_by(
origin_model.op_date,
origin_model.id_
).limit(self.max_count).all()
ifnot origin_data:
break
l_op_date = origin_data[-1].op_date
l_id = origin_data[-1].id
yield origin_data
if len(origin_data) < self.max_count:
break
當然,其中做了很多的優化,比如說可重用性,擴充套件性等等。想要做多個表的同步更新,每個表傳輸類只需要繼承自basetransfer,然後實現自己獨有的一些特殊方法就可以了,只需區區5,6行就可以完成乙個新的傳輸類。具體**就不展示了。 資料同步方案
作為業務系統的開發設計人員,資料及資料同步是非常重要的工作之一。在日常的軟體開發過程中,經常會碰到推送和拉取等業務。那麼一開始如何選用推送或拉取這兩個方案呢?這是由實際業務決定 雙方系統的技術實現 雙方系統的架構和效能,看日後是否此業務會經常修改等多方面決定的。下面本文就從實際的兩個業務情況來討論。...
資料同步方案
三 軟體選擇 同步分為 實時同步和離線同步 實時同步,一般是通過監控源資料變更操作,通過在目標端實時重放操作,從而達到實時同步的目的 離線同步,相當於某個時候對源資料做乙個快照。mysql自帶功能 一般針對的是整個資料庫 參考 主主同步 同步型別 實時同步 簡介 kafka是訊息中介軟體的一種 開發...
es同步mysql方案 ES資料同步方案
當業務量上公升後,由於mysql對全文檢索或模糊查詢支援的能力不強,在系統中查詢的地方,往往會出現慢sql等,拖累系統其他模組,造成效能低下。隨著es使用普及率的公升高,es是mysql的乙個有效補充。我們可以將資料傳送到搜尋引擎 如es 上,由搜尋引擎來提供專業的服務。接下來,就結合工作中實際用到...