在生產環境中,用kafka來解耦是常用的技術手段。為了保證訊息的順序處理,會把相同屬性(同乙個人、同乙個素材等) 的訊息發往kafka同乙個partition中。例如,在廣告系統中,會把某乙個ad的轉化資料傳送到同乙個partition。
ad1 -> [ad1-msg1, ad1-msg2, ...] -> 順序發到partition1
ad2 -> [ad2-msg1, ad2-msg2, ...] -> 順序發到partition2
...
這樣做的初衷是希望對於同乙個廣告的訊息,不要出現消費的時候先後順序錯亂。
但是這裡有乙個速度不匹配的問題——consumer處理的速度往往是遠遠小於consumer拉取訊息的速度的,於是很多人就會想到用多執行緒加速。但是這會帶來乙個問題:同乙個ad的訊息先後順序錯亂,背離我們的初衷。由此,引出我們今天的問題:執行緒池如何保證有序。
問題定義
上面我們分析的思路是通過執行緒池來加速訊息的處理,而訊息順序錯亂也是由於引入執行緒池帶來的。有一種方法比較簡單粗暴且有效,加partition,增加consumer,每個consumer使用單執行緒來處理。但是如果訊息太多,部署成本就會上公升,所以回到了執行緒池如何保證有序上來。我們希望實現這樣乙個執行緒池:
private final executorexecutor = ***;
void consumer(mydto dto) );
}
把相同key的msg只交給乙個執行緒去處理,那就保證了它的順序性;
不同key的msg交給不同的執行緒去處理;
下面我們逐一來看這兩個問題:
關於把相同key的msg交給乙個執行緒去處理,同時不想頻繁的建立和銷毀乙個執行緒。我們可以宣告乙個核心執行緒數和最大執行緒數都是1的執行緒池即可:
linkedblockingqueuequeue = new linkedblockingqueue<>();
executorservice executor = new threadpoolexecutor(1, 1,..., queue);
// 其中queue為阻塞佇列
不同key的msg交給不同的執行緒去處理。
這條比較好理解,如果來了乙個key,檢視是不是有對應的執行緒池處理它。有,那麼扔到上面宣告的執行緒池中;沒有,選擇(新建)執行緒池來處理。
最終的實現原理如下圖所示,最終我們的這個執行緒池裡面會包含多個核心執行緒數=最大執行緒數=1的小執行緒池。
問題解法
最關鍵的兩個方法:
public inte***ce keyaffinityextends autocloseable, iterable
keyaffinity
的實現類:
class keyaffinityimplimplements keyaffinity
v.incrconcurrency();
// 如果有相同的key, 那麼返回這個執行緒池並處理.
return v;
});return keyref.ref();
} @override
public void finishcall(k key) else
});}
}
keyaffinityexecutor
執行緒池執行主體:
public inte***ce keyaffinityexecutorextends keyaffinity catch (interruptedexception ie)
return false;}};
} else
// 2.宣告核心執行緒數=最大執行緒數的執行緒池
return new threadpoolexecutor(1, 1, 0l, milliseconds, queue, threadfactory);}})
.build();
} default void executeex(k key, @nonnull throwablerunnabletask) catch (throwable e) finally
});addcallback = true;
} finally }}
}
上面三個類,是實現該執行緒池最核心的三個方法,有興趣的同學沿著我所列出的核心方法,可以看整個實現的技巧。祝進步! 如何保證執行緒安全有序性 執行緒池如何保證有序?
背景 在生產環境中,用kafka來解耦是常用的技術手段。為了保證訊息的順序處理,會把相同屬性 同乙個人 同乙個素材等 的訊息發往kafka同乙個partition中。例如,在廣告系統中,會把某乙個ad的轉化資料傳送到同乙個partition。ad1 ad1 msg1,ad1 msg2,順序發到par...
執行緒安全性 有序性
乙個執行緒內,按照 順序書寫在前面的操作先行發生於書寫在後面的操作。乙個unlock操作先行發生於後面對同乙個鎖的lock操作。對乙個變數的寫操作先行發生於後面對這個變數的讀操作。如果操作a先行發生於操作b,而操作b又先行發生於操作c,則可以得出操作a先行發生於操作c thread物件的start ...
如何保證執行緒安全?
執行緒安全 一般說來,確保執行緒安全的方法有這幾個 競爭與原子操作 同步與鎖 可重入 過度優化。競爭與原子操作 多個執行緒同時訪問和修改乙個資料,可能造成很嚴重的後果。出現嚴重後果的原因是很多操作被作業系統編譯為彙編 之後不止一條指令,因此在執行的時候可能執行了一半就被排程系統打斷了而去執行別的 了...