2.1 核心邏輯
相對於網路層,api層的實現相當簡單,只是對底層實現的封裝,得益於此,從api層幾乎就能了解到kafka broker所能提供的全部功能啟動num.io.threads個requesthandler
每個requesthandler都從全域性requestqueue中poll request
根據request的apikeys呼叫對應的handle*api進行實際的業務邏輯處理
2.2 核心類、方法介紹
kafkarequesthandler
|-- run()
kafkaapis
|-- handle() // 所有請求的入口,根據apikeys分發請求
3.1 啟動流程啟動流程前半部分和網路層一致,這裡不再贅述
// kafkaserver.scala
def startup()
// kafkarequesthandler.scala
class kafkarequesthandlerpool(val brokerid: int,
val requestchannel: requestchannel,
val apis: kafkaapis,
time: time,
numthreads: int) extends logging with kafkametricsgroup
def createhandler(id: int): unit = synchronized
}
實際就是例項化api層的入口,再啟動num.io.threads個requesthandler執行緒,用於實際處理請求,值得注意的是這裡的執行緒池,其實不是真正意義上的執行緒池,執行緒數目是固定的,只有通過動態引數改變執行緒池大小時,才會重新調整執行緒數目
def resizethreadpool(newsize: int): unit = synchronized
} else if (newsize < currentsize)
}threadpoolsize.set(newsize)
}
3.2 請求分發流程requesthandler執行緒啟動後,每個執行緒都一直從網路層獲取request再交給kafkaapis進行請求分發
// kafkarequesthandler.scala
def run()
}shutdowncomplete.countdown()
}// requestchannel.scala
private val requestqueue = new arrayblockingqueue[baserequest](queuesize)
def receiverequest(timeout: long): requestchannel.baserequest =
requestqueue.poll(timeout, timeunit.milliseconds)
// kafkaapis.scala
def handle(request: requestchannel.request)
}
requesthandler從全域性阻塞佇列獲取網路層組裝完的request,並呼叫kafkaapi的handle方法
handle方法類似web開發中的dispatch,根據apikeys呼叫對應的handle*執行實際業務邏輯
可以看到api層只是對執行執行緒、根據apikeys進行請求路由的封裝。實際邏輯由kafkaapis類中的handle*方法實現,而其中最核心的方法為produce與fetch請求,分別對應訊息的生產與消費,下面的文章將專門針對這兩種api進行原始碼分析
azkaban web server原始碼解析
azkaban主要用於hadoop相關job任務的排程,但也可以應用任何需要排程管理的任務,可以完全代替crontab。azkaban主要分為web server 任務上傳,管理,排程 executor server 接受web server的排程指令,進行任務執行 1.資料表 projects 工...
JDK LinkedHashMap原始碼解析
今天來分析一下jdk linkedhashmap的源 public class linkedhashmapextends hashmapimplements map可以看到,linkedhashmap繼承自hashmap,並且也實現了map介面,所以linkedhashmap沿用了hashmap的大...
Redux原始碼createStore解讀常用方法
const store createstore reducer,preloadedstate enhancer 直接返回當前currentstate,獲取state值,return state 我覺得應該深轉殖乙個新的物件返回,不然有可能會被外部修改 function getstate consol...