Spark專案實戰經驗

2021-10-03 14:34:13 字數 2035 閱讀 2895

auto.offset.reset 啟動時讀取的偏移量。如果是需要歷史資料那麼設定成earliest 如果不需要消費歷史資料那麼設定成latest

(1)如果自動提交偏移量 spark streaming消費到資料之後立馬提交,那麼就會有乙個問題 提交偏移量成功 處理失敗了,那麼spark streaming第二次去啟動的時候

會造成丟資料的情況

(2)如果不允許資料丟失,首先要設定成手動提交,處理完業務資料之後在去提交乙個偏移量。因為處理業務資料這一步驟它是worker端分布式的,提交偏移量操作是在driver端,兩部分操作不在乙個事務裡面,所以造成處理資料成功了,提交偏移量失敗了(停電,斷網 極端情況下)會造成spark streaming第二次啟動的時候造成資料重複消費的問題 。

涉及到金額的情況下會去考慮乙個事物的問題,繫結事物操作worder端將資料collect聚合到driver端再去編寫同乙個事物,一般情況下是不考慮事物的。spark當中凡事count ,collect這些會將資料聚合到driver的運算元一般不推薦使用,耗時非常大。

使用kafkautils.createdirectstream這個方法建立,該方法使每個executer直接對應topic的分割槽。需要去導 spark-streaming-kafka-0-10_2.11的包。

如果是手動維護偏移量,那麼進行一次判斷,判斷mysql表當中也就是歷史資料是否存在偏移量,如果不存在偏移量說明spark程式是第一次啟動那麼就是按earliest進行消費,如果mysql當時存在偏移量的,那麼spark程式至少啟動過一次,那麼會根據mysql偏移量進行消費,根據偏移量消費,需要封裝乙個hashmap[topicpartition, long],topicpartition封裝對應的topic名稱和分割槽號,long值就是對應的偏移量,然後將hashmap傳入到consumerstrategies.subscribe[string, string]這個方法的第三個引數裡。

dstream.foreachrddd(rdd->

//關閉jdbc資源,注意不關的話會造成記憶體洩露的問題呢}}

如果在spark程式遇到了object無法序列化的錯誤,那麼需要注意**裡肯定是錯誤的將driver物件使用到了worker端。因為scala閉包特性,你在編寫的時候是不會報錯的。

dstream.froeachrdd(rdd->

如果偏移量想要手動維護並且提交到kafka,那麼可以下面這個**

stream.asinstanceof[cancommitoffsets].commitasync(offsetranges)

這個程式可能會掛掉的,並不是能完全一直執行下去的,那麼這個時候需要比如第三方外掛程式將spark streaming掛了之後重新拉起來。

重新拉起方法

(1)–supervise 官方提供的spark-submit引數

(2)supervise第三方乙個外掛程式,專門監控linux程序的乙個守護程序,當你這個被監控程序如果掛掉了,那麼這個supervise外掛程式就會把你這個重新拉起來

預設分割槽個數是跟kafka分割槽個數掛鉤,也就是kafka topic建立的時候指定了10個分割槽,那麼spark stremaing預設乙個分割槽就是10個,然後spark streaming一般不會去使用重分割槽的操作(把分割槽擴大或者分割槽縮小一般不會用),因為擴大repartition運算元會造成 shuffle,shuffle資料會落盤那麼會耗時,coalease運算元縮小並行度就變小了速度就會變慢。

spark sql當中task個數是cpu個數2倍到3倍,這個是脫機數倉。

spark steaming當中要想spark streaming速度執行的最快,那麼task執行比跟cpu必須1:1。1:1是最快的,讓每個task同時並行的執行.所以集群當中數倉離線任務是耗記憶體任務,

實時任務是耗cpu的任務,如果你的公司分的細的那麼兩個集群分開,乙個集群專門跑離線任務,乙個集群跑實時任務。

spark-submit 控制cpu那麼就是由 --num-executors --executor-cores 這兩引數控制,也就是兩個引數相乘要等於乙個task個數的,task個數就是分割槽個數。而且如果cpu個數大於

task個數是沒有效果的,會存在cpu空轉的情況 。

Xunsearch迅搜專案實戰經驗

在這裡我們使用的是sdk以及一台xunsearch伺服器,意思是不在同一伺服器上的。xunsearch伺服器和sdk伺服器。1.部署xunsearch伺服器,網上還是有文件的 tar xjf xunsearch full latest.tar.bz2第二步 執行安裝指令碼,根據提示進行操作,主要是輸...

MongoDB實戰經驗分享

nosql並不是no sql,而是指not only sql。nosql的出現是為了彌補sql資料庫因為事務等機制帶來的對海量資料 高併發請求的處理的效能上的欠缺。nosql不是為了替代sql而出現的,它是一種替補方案,而不是解決方案的首選。絕大多數的nosql產品都是基於大記憶體和高效能隨機讀寫的...

雜湊表實戰經驗

雜湊表的作用 簡單來說就是將乙個龐大的值域 複雜的資料結構 對映到乙個較小的空間 例如0 n,n 為1e5或1e6等比較小的數 通常寫雜湊函式最簡單方法就是 例如 h x x mod 10 5,將很大的x對映到10 5內。但是往往會面臨衝突,因為值域太大,經過雜湊函式的計算對映的值相同。那麼如何處理...