kafka 實踐《第二章》

2021-09-25 21:13:05 字數 3279 閱讀 9650

kafka 環境配置:jdk1.8

ssh安裝配置:對於kafka 集群來講,配置ssh免密登入不是必須的步驟,但分布式系統,一般會由多台機器構成,為了便於操作管理,一般通過ssh方式啟動集群**。

安裝ssh:

sudo apt-get install ssh 

進入.ssh目錄,生成秘鑰對:

ssh-keygen -t rsa 

ssh-keygen 用於生成認證秘鑰,-t 用來指定秘鑰型別,這裡選擇rsa 秘鑰。執行完畢後會在.ssh 目錄下生成is_rsa 和 id_rsa.pub 兩個檔案,前者是私鑰,後者是公鑰。

zookeeper 環境:

zoo

keeper 是

分布式應用程式協調服務框架,分布式應用程式可以基於

zookeeper

實現同步服務、配置維護、命名服務等, zoo

keeper

能提供基於類似於檔案系統的目錄節點樹

方式的資料儲存,通過監控各節點資料狀態的變化,達到基於資料的集群管理。

zookeeper 集群主要角色說明:

leader :集群的領導者,負責投票的發起和決議及更新系統狀態。

follower: 跟隨者,接受客戶端的請求並返回結果給客戶端,參與投票。

observer: 接受客戶端的請求,將寫的請求**給leader,不參與投票。observer 目的是擴充套件系統,提高讀的速度。

kafka

依賴 zoo

keeper

,通過

zookeeper

來對**、消費者上下線管理、集群、分割槽元數

據管理等,因此 zoo

keeper

也是 kafka

得以執行的基礎環境之一。

zookeeper 安裝參考:

kafka manager 安裝參考:

kafka manager 安裝:檢視集群中**列表,主題列表,消費組列表,每個主題對應的分割槽列表等,通過web介面操作來建立乙個主題或者在**負載不均衡時,手動執行分割槽平衡操作等。kafka 監控及管理工具:kafka manager,kafka web console, kafkaoffsetmonitor等工具都可以。

kafka 核心元件包括:延遲操作元件,控制器,協調器,網路通訊,日誌管理器,副本管理器,動態配置管理器及心跳檢測等。

1.延遲操作元件:協助客戶端處理建立主題操作,協助組協調器處理joingrouprequest 和heartbeatrequest請求,協助副本管理器處理producerequest 和 fetchrequest 請求。

2.delayedoperation:kafka 將一些不立即執行而要等到滿足一定條件之後才觸發完成的操作稱為延遲操作,並將這類操作定義為乙個抽象類delayedoperation ,delayedoperation 是乙個基於事件啟動有失效事件的timertask。timertask實現了runnable 介面,維護了乙個timertaskentry物件,timertaskentry 繫結了乙個timertask,timertaskentry 被新增到timertasklist中,timertasklist 是乙個環形雙向鍊錶,按失效時間排序。 

delayedoperation 是乙個抽象類,具體的延遲操作類繼承於該抽象類,分別用來協助相應元件對不同的請求完成延遲處理請求。

delayedoperation 只有乙個atomicboolean 型別的completed 屬性,用來控制某個延遲操作。在延遲時間delayms 內,oncomplete 方法只被呼叫一次。

trycomplete()方法: 乙個抽象方法,由子類來實現,負責檢測執行條件是否滿足 若滿足執行條件,則呼叫forcecomplete()方法完成延遲操作。

• forcecompete()方法: 該方法在條件滿足時,檢測延遲任務是否未被執行。若未被執行,則先呼叫 timertask.cancel()方法解除該延遲操作與 timertaskentry 的繫結,將該延遲操作timertasklist 鍊錶中移除,然後呼叫 oncomplete()方法讓延遲操作執行完成 通過completed cas 原子操作(completed.compareandset),可以保證併發操作時只有第乙個呼叫該方法的執行緒能夠順利呼叫 oncomplete()完成延遲操作,其他執行緒獲取的 ompleted屬性false ,即不會呼叫 oncomplete()方法 ,這就保證了 oncomplete() 只會被呼叫一次。

• oncomplete()方法:是乙個抽象方法,由子類來實現,執行延遲操作滿足執行條件後需要執行的實際業務邏輯。 例如, delayedproduce 和 delayedfetch 都是在該方法內呼叫responsecallback 客戶端做出響應。

safetrycomplete()方法:以synchronized 同步鎖呼叫 oncomplete()方法,供外部呼叫。

• onexpiration()方法:乙個抽象方法,由子類來實現當延遲操作己達失效時間的相應邏輯處理。 kafka 通過 systemtimer 來定期檢測請求是否超時。 systemtimer 是kafka實現的底層基於層級時間輪和 delayqueue 定時器 維護了 newfixedthreadpool 執行緒

池,用於提交相應的執行緒執行。例如,當檢測到延遲操作己失效時則將延遲操作提交到該執行緒池,即執行執行緒的 run()方法的邏輯。 delayedoperation 覆蓋了 timertask run()方法,在該方法中先呼叫 forcecompeteo方法,當該方法返回 true 後再呼叫 onexpiration()方法。

delayedoperationpurgatory

delayedoperationpurgatory 是乙個對delayedoperation 管理的輔助類。

watch() 方法: 用於將delayedoperation 新增到operations 集合中。

trycompletewatched()方法: 用於迭代operations 集合中的delayoperation ,通過delayedoperation.iscompleted檢測該delayedoperation 是否已執行完成。若已執行完成,則從operations 集合中移除該delayedoperation ,否則呼叫delayedoperation.safetrycomplete()方法嘗試讓該delayedoperation 執行完成,若執行完成,即safetrycomplete()方法返回true,則將delayedoperation從operations 集合中移除,最後檢測operations 集合是否為空,如果為空,則表示operations 所關聯的delayedoperation 已全部執行完成,因此將watchers 從 purgatory 的 pool 中移除。

python第二章上機實踐 第二章上機實踐報告

設計乙個平均時間為o n 的演算法,在n 1 n 1000 個無序的整數中找出第k小的數。輸入格式 輸入有兩行 第一行是n和k,0 第二行是n個整數 輸出格式 輸出第k小的數 輸入樣例 在這裡給出一組輸入。例如 10 4 2 8 9 0 1 3 6 7 8 2 輸出樣例 在這裡給出相應的輸出。例如 ...

C 第二章實踐

main方法是程式執行的入口 下面的兩個程式之間有點小區別,注意比較 情況一 輸入十個資料會檢查十個數中的偶數 using system using system.collections.generic using system.linq using system.text using system...

python第二章上機實踐 演算法第二章上機實踐報告

1.實踐題目名稱 找第k小的數 2.問題描述 在n 1 n 1000 個無序的整數中找出第k小的數,且時間複雜度為o n 3.演算法描述 int partition int a,int left,int right 函式功能 將輸入的陣列進行排序,排序後的陣列需要滿足 基準左邊的數都小於基準,基準右...