DT大資料夢工廠Spark定製班筆記 009

2021-07-11 23:26:52 字數 2551 閱讀 8667

spark streaming原始碼解讀之receiver在driver的精妙實現全生命週期徹底研究和思考

在經過了一系列的有關spark streaming job的考察之後,我們把目光轉向receiver。

spark streaming中receiverinputdstream都是現實乙個receiver,用來接收資料。而receiver可以有很多個,並且執行在不同的worker節點上。這些receiver都是由receivertracker來管理的。

receivertracker的start方法如下(receivertracker.scala 152-164行)

defstart(): unit = synchronized 

if(!receiverinputstreams.isempty)

}

它首先例項化乙個訊息通訊體receivertrackerendpoint,然後呼叫launchreceivers(receivertracker.scala 436-447行).

private deflaunchreceivers(): unit = 

rundummysparkjob()

loginfo("starting "+ receivers.length +" receivers")

endpoint.send(startallreceivers(receivers))

}

它的巧妙之處在於會先執行

rundummysparkjob(),從而獲得集群中executor的情況。

然後向訊息通訊體傳送startallreceivers訊息。

訊息通訊體收到訊息後的處理過程如下所示(receivertracker.scala 468-475行)

casestartallreceivers(receivers) =>

valscheduledlocations = schedulingpolicy.schedulereceivers(receivers, getexecutors)

for(receiver

在這裡我們需要展示一下函式startreceiver中一處精妙的實現 (receivertracker.scala 605-611行)

valreceiverrdd: rdd[receiver[_]] =

if(scheduledlocations.isempty)else

在這裡receiver被封裝成了rdd!(所以receiver必須是可以序列化的)
並被提交到集群中執行。(receivertracker.scala 616行)
valfuture = ssc.sparkcontext.submitjob[receiver[_], unit, unit](

receiverrdd, startreceiverfunc, seq(

0), (_, _) => unit, ())

在任務被提交到worker節點後,執行如下操作。(r

eceivertracker.scala 585-602行)

// function to start the receiver on the worker node

valstartreceiverfunc: iterator[receiver[_]] => unit =

(iterator: iterator[receiver[_]]) =>

if(taskcontext.get().attemptnumber() == 0)else

}

此處建立了乙個receiversupervisorimpl物件;用來管理具體的receiver。

它首先會將receiver註冊到receivertracker中 (receiversupervisor.scala 182-186行)

override protected defonreceiverstart(): boolean =
註冊成功後,啟動receiver。(receiversupervisor.scala 144-159行)

defstartreceiver(): unit = synchronizedelse}catch}

上述過程如下圖所示 ** 感謝作者!

DT大資料夢工廠Spark定製班筆記 007

spark streaming原始碼解讀之jobscheduler內幕實現和深度思考 接前文spark streaming jobset的提交 jobgenerator.scala 253行 jobscheduler.submitjobset jobset time,jobs,streamidtoi...

DT大資料夢工廠 溫故而知新 之15講

360雲盤 訪問密碼 45e2 dt大資料夢工廠 溫故而知新 之15講scala 中的特質的多繼承 package testpractice author administrator object traitmoreextenddemo trait aa trait bb class zz exte...

大資料時代IT或被DT替代

dt是資料處理技術 datatechnology 的英文縮寫。馬雲曾經在一次演講中說道 人類正從it時代走向dt時代 那麼到底什麼是dt,與it有什麼不一樣呢?it時代是以自我控制 自我管理為主,而dt datatechnology 時代,它是以服務大眾 激發生產力為主的技術。這兩者之間看起來似乎是...