我們的專案,是 mysql+elasticsearch 做的乙個資料庫和搜尋引擎,專案經理提出需要做乙個用於重建 es 搜尋資料的介面,這個任務很光榮的交給了我。
在功能的編寫過程當中,我突然思考這樣乙個問題,因為我們 web 專案本身是多執行緒的,那如果在同一時間段,有多個請求同時發起,那同時發起 es 的重建,對於 es 來說,可能會產生一些莫名其妙的問題。
所以我感到非常高興,因為這個問題,似乎不是聽起來的那麼簡單。於是乎我想到了,要加入同步鎖了。
最開始我只是很簡單的想,直接在對應的 service 層寫乙個方法,然後直接加乙個
synchronized(this)
在整個方法體上。
@override
public synchronized int rebuiltbountydata() throws exception
但是這個方法很快就聯想到了另乙個問題:
我們是希望不要多執行緒同時重建資料,但是如果排隊重建呢?好像也不是我們想要的結果。我希望的是當乙個執行緒在執行重建任務的時候,另乙個執行緒要被拒絕開始任務,而不是等待上乙個任務做好後再開始。因為我們 tomcat 是採用執行緒池的概念,如果所有執行緒都執行這個方法,最後每個執行緒都會處於等待狀態,結果其他請求就會因為沒有空閒的執行緒可用,而無法正常執行。
so,我們修改了一下思路:
在 service 的這個實現類中,新增乙個私有類成員物件 flag = false,當執行緒進入時,判斷 flag 是否為 true,是,則直接丟擲異常,結束執行緒。否,則修改 flag 的值為 true,然後開始執行執行緒任務,並且,我們對這個 flag 加上乙個同步鎖,例如:我們在**中使用時,加入這樣一段
synchronized(flag)
由於 spring 預設是單例模式,所以這個flag 在多個執行緒中是共享的,這樣就不需要將這個flag 設定為 static 了,因為它在這個區域性當中實現了類似 static 的作用。但是這個時候,flag 不能是基礎型別,必須是 boolean 包裝型別。那就會產生另乙個隱患:包裝類的物件僅僅是乙個引用,引用是可以被更換了,比如使用了這個 flag 的 set 方法來修改值,但是同步鎖取得是引用的鎖,而不是引用對應那個例項的鎖,鎖了引用卻沒鎖例項,但我們實際上卻要根據例項的狀態來判斷,這就會造成乙個隱患,可能會使得同步鎖失效。
那使用 this 來獲得整個 service 類的同步鎖,貌似可以解決問題(如下面這段**具體實現),但是如果萬一以後這個 service 還有其他需要用到同步鎖的需求怎麼辦呢?這樣就會讓兩個不想幹的業務邏輯因為同步鎖的問題產生互相的影響。新增同步鎖,要盡可能的縮小同步鎖的獲取範圍,和鎖內**的**量,這樣才能減少衝突和執行緒獲取鎖時等待的時間,提高軟體的安全性和執行效率。
而且,我們的需求在這個時候,又有了變化,專案經理說,有兩張表都需要做這種功能。就是說兩個業務內容,都需要進行es 的資料重建。所以如果每次增加乙個,我就要單獨寫乙個類似下面這段**,不僅**的可復用性降低了,而且以後換別人來維護的時候,說不定會寫錯這些內容。
@override
public int rebuiltbountydata() throws exception else
}//獲取總數
int pagetotal, pagesize = 1000;
if (count % pagesize != 0) else
try
} catch (exception e) finally
return count;
}
最好的辦法,就是把這個需要「加鎖」的邏輯,單獨賦予乙個物件,讓這個鎖的範圍能夠縮小到只針對這個邏輯,這個功能,而不要跟其他的功能混在一起。 然後我們需要對這個功能,進行進一步的抽象。
我們來好好觀察上面這段**,上面這段**,算是已經實現了整個功能,從頭到尾分解一下這段**的功能,可以看得出如下:
單執行緒檢查
分頁處理
獲取資料
寫入 es
so,我們可以看到,其實不同業務場景下,執行緒檢查是一模一樣的**,而分頁處理中,獲取資料總條數會根據不同業務場景而不同,其他**也都是相同的,至於寫入 es 的部分,如果資料結構跟從資料庫中獲取的實體物件沒有區別的話,這個也是可以看做是相同的而不需要特別的處理,但是我們公司的專案中,因為種種原因,es 中的資料結構和實體物件是不同的(儘管資料字段都是相同,我表示我不知道怎麼跟你們說這個歷史遺留的奇葩問題...)。在這裡,我們要應用乙個設計模式,是模板模式,將固定的流程**封裝起來。再將可變的部分,留給子類實現。
/**
* 類說明:從 jdbc 中獲取重建 es 的資料
*/@service
public abstract class jdbcrebuiltesserviceextends baseservice else ]發起 es 重建任務!其他重建任務請求將被拒絕!", getuserjid());}}
}/**
* 資料重建
** @return
* @throws exception
*/public int rebuild() throws exception ", totalnum);
int pagetotal;
int pagenum = this.startpage;
int pagesize = this.pagesize;
//根據條目總數計算總頁數
if (totalnum % pagesize != 0) else
long starttime = system.currenttimemillis();//任務開始計時
try
} catch (exception e) ", decimalformat.format(progress));
throw e;
} finally 毫秒", endtime - starttime);
}return totalnum;
}public int getstartpage()
public void setstartpage(int startpage)
public int getpagesize()
public void setpagesize(int pagesize)
}
ok,這樣就解決了。複寫三個容易跟隨應用場景不同,而改變的方法,分別是,獲取資料源,獲取資料總條目,寫入 es。然後暴露 rebuild 方法給外部呼叫,在 rebuild 方法內部,實現整個運作流程,這樣也可以避免以後有人需要做新的實現的時候,修改到這部分有涉及到同步鎖的**,以避免安全隱患。
實際使用的時候可以這樣用,建立乙個子類繼承這個 jdbcrebuiltesservice
/**
* 類說明:商品資訊 es 重建所需要實現的具體方法
*/@service
public class goodsrebuiltesserviceimpl extends jdbcrebuiltesservice
@override
public collectionloaddatasource(int pagesize, int pagenum) throws exception
@override
public void writetoelasticsearch(collectioncollection) throws exception
}
這樣以後每次使用,都只需要實現乙個新的子類,然後這樣呼叫:
@autowired
@qualifier("goodsrebuiltesserviceimpl")
private jdbcrebuiltesservicejdbcrebuiltesservice;
/*** 資料重建
** @return
* @throws exception
*/@override
public int rebuiltesgoodsdata() throws exception
這樣 rebuilt 方法就很安全的被呼叫,將程式中不希望被修改的部分,用父類寫好,只留下希望被複寫的部分,這樣就可以很好的保護比較關鍵的部位,當然了,public 方法也是可以重寫的,不過這就超出了我們「以防萬一,不小心寫錯」的初衷了,如果需要重寫,那就重寫唄。 ThreadPoolExecutor 多執行緒
from concurrent.futures import threadpoolexecutor,wait,all completed from queue import queue myqueue queue 佇列,用於儲存函式執行結果。多執行緒的問題之一 如何儲存函式執行的結果。def thr...
elasticsearch多磁碟擴容
由於早前elasticsearch集群資料儲存路徑只配置了乙個,所以某天磁碟突然爆滿,集群差點當機。需重新配置多路徑儲存路徑,因為在生產環境,得保證集群不死掉,只能一台一台配置重啟。修改elasticsearch.yml中path.data屬性,新增多路徑以逗號分隔 有乙個索引的某個分片一直處理un...
Elasticsearch多文件操作
閱讀文字大概需要3分鐘。1 一次性獲取多個index public static void main string args throws ioexception onshutdown client.close 執行結果 2 bulk api,又稱批量api允許在單個請求中索引和刪除多個文件 pub...