論一資料同步方案

2021-07-26 18:41:24 字數 3020 閱讀 9303

前段時間重做了乙個資料同步方面的工作,具體是把相關資料從乙個遠端複製到本地端,需要做到不重、不漏、無誤的同時保持使用簡單方便。

我們跟第三方合作,可以讀取使用第三方的資料來源,但不能修改,而第三方的資料來源沒有我們想要的索引和主鍵約束,所以想要更方便地使用第三方資料必須把資料同步傳輸到本地,根據自己的需求加索引加主鍵約束等等。第三方資料庫中的資料會增加,修改,但不被會刪除。

根據資料來源的特點,把我們需要的字段同步過來,根據資料特性加上主鍵約束來確保資料的唯一性,更新操作通過主鍵來做對應。

每次同步都只取前一天更新的源資料進行更新操作。

經過不斷地踩坑後發現,原來第三方的資料來源不止是會增加,還會進行修改,不止是普通欄位的修改,我們自建的主鍵中的值也會修改,雖然發生的頻率很小,由於資料特性,不會發生主鍵衝突,但主鍵都改了,兩份資料就沒法對應了,這就導致了會多出一些「錯誤」資料。

利用cron做的定時任務,一旦某次資料衝突或者發生硬體故障沒有更新,那麼在解決了故障之後必須手動的傳入引數把前n天的資料都做一遍更新。

原來的方案資料是一條一條的查詢的,效率十分低下。

所以很有必要重新做乙個同步方案。

經過觀察,第三方資料庫中每一張表都有且僅有乙個唯一主鍵的id,此外還有資料更新時間op_date。所以我們只能把唯一主鍵也同步過來。

這種方案,資料正確性可以保證,但是其他方面還是不盡如人意。而且,假如說某乙個天的資料量特別巨大的話,會不會發生記憶體不夠?既然選擇重做,那就做好,把一切有可能發生的問題都避免掉。所以可以由一次性同步改為分批次同步。這是乙個小的優化點。

其實也還有其他的問題,假如更新了一般發生了意外停止了更新而又沒有被監控到怎麼辦?

這個時候老大看了我的設計方案,我們思索討論了一下,有了乙個新的思路。

這裡引入了兩個新名詞:全量更新和差量更新。

全量更新是從零開始全部更新,差量更新是從上次更新的地方接著更新。差量更新的好處在於,及時上次因為某種意外斷掉了,也不會影響下一次更新,並且下一次更新會把上一次未更新的資料也更新了。

最後成型的具體方案是:

獲取local庫相應表中最新的資料,以op_date和id倒序排列。記最新資料的op_date和id分別為l_op_date, l_id。

獲取origin庫相應表中,op_date 等於 l_op_date的資料進行同步,一次最多取max_count個,直到取完。

獲取origin庫響應表中,op_date 大於 l_op_date、id 大於 l_id的資料,以op_date和id倒序排列,最多取max_count個,同步更新並記 l_op_date、l_id為最新條資料相應的op_date和l_id。

重複2-3,直到取完所有資料。

class

basetransfer

(object):

max_count = 2000

defrun

(): count = 0

exec_count = 0

for origin_data in self.get_origin_data():

local_data_dict =

for item in origin_data:

if item.is_broken():

continue

# 特殊的檢測

ifnot self.special_check(item):

continue

mn = mass_dict.get(item.id)

if mn:

# 更新

if self.update(item, mn):

count += 1

else:

# 插入

self.add(item)

count += 1

if count - exec_count >= self.max_count:

exec_count = count

db.session.commit()

db.session.commit()

defget_origin_data

(self):

# 獲取op_date和wind_id

local_data = self.get_lastest_record()

# 第一次匯入全部資料的時候需要考慮沒有資料的情況

ifnot local_data:

l_op_date = datetime(1980, 1, 1)

l_id = ''

else:

l_op_date = local_data.op_date

l_id = local_data.id

while

true:

# 先做op_date等量查詢

while

true:

origin_data = 等量查詢

ifnot origin_data:

break

l_id = origin_data[-1].id

yield origin_data

# 大於當前op_date

origin_data = origin_model.query.filter(

origin_model.op_date > l_op_date

).order_by(

origin_model.op_date,

origin_model.id_

).limit(self.max_count).all()

ifnot origin_data:

break

l_op_date = origin_data[-1].op_date

l_id = origin_data[-1].id

yield origin_data

if len(origin_data) < self.max_count:

break

當然,其中做了很多的優化,比如說可重用性,擴充套件性等等。想要做多個表的同步更新,每個表傳輸類只需要繼承自basetransfer,然後實現自己獨有的一些特殊方法就可以了,只需區區5,6行就可以完成乙個新的傳輸類。具體**就不展示了。

資料同步方案

作為業務系統的開發設計人員,資料及資料同步是非常重要的工作之一。在日常的軟體開發過程中,經常會碰到推送和拉取等業務。那麼一開始如何選用推送或拉取這兩個方案呢?這是由實際業務決定 雙方系統的技術實現 雙方系統的架構和效能,看日後是否此業務會經常修改等多方面決定的。下面本文就從實際的兩個業務情況來討論。...

資料同步方案

三 軟體選擇 同步分為 實時同步和離線同步 實時同步,一般是通過監控源資料變更操作,通過在目標端實時重放操作,從而達到實時同步的目的 離線同步,相當於某個時候對源資料做乙個快照。mysql自帶功能 一般針對的是整個資料庫 參考 主主同步 同步型別 實時同步 簡介 kafka是訊息中介軟體的一種 開發...

es同步mysql方案 ES資料同步方案

當業務量上公升後,由於mysql對全文檢索或模糊查詢支援的能力不強,在系統中查詢的地方,往往會出現慢sql等,拖累系統其他模組,造成效能低下。隨著es使用普及率的公升高,es是mysql的乙個有效補充。我們可以將資料傳送到搜尋引擎 如es 上,由搜尋引擎來提供專業的服務。接下來,就結合工作中實際用到...