topiccommand.createtopic
def createtopic
(zkutils: zkutils, opts: topiccommandoptions)
else
{//往目錄裡面寫topic的分割槽的分配方案
->
writetopicpartitionassignment
(zkutils, topic, partitionreplicaassignment, update)
監聽變化
kafkacontroller.oncontrolle***ilover
// 監聽分割槽的變化
partitionstatemachine.
registerlisteners()
-> registertopicchangelistener
// /brokers/topics
zkutils.zkclient.
subscribechildchanges
(brokertopicspath, topicchangelistener)
-> topicchangelistener =
newtopicchangelistener()
//傳送元資料變更的請求
controller.onnewtopiccreation
onnewpartitioncreation
replicastatemachine.handlestatechanges
//傳送請求給其他所有的broker
brokerrequestbatch.
sendrequeststobrokers
(controller.epoch)
//todo 傳送請求 apikeys.update_metadata_key
controller.
sendrequest
(broker, apikeys.update_metadata_key,
some
(version)
, updatemetadatarequest, null)
至此,與上篇一致,都會更新元資料
如何在Kafka上建立乙個Topic
bin kafka topics.sh zookeeper 192.168.2.225 2183 config mobile mq create topic test.example replication factor 2 partitions 24 topic指定topic name parti...
如何選擇乙個Kafka集群中的主題分割槽的數量
kafka集群中分割槽應該設定多少比較合適,這是乙個面對眾多開發者共同的難題,這篇文章的目標就是來解釋一些重要的因素,同時會提供一些簡單的公式。首先我們要有個認知,那就是分割槽 partition 是kafka中的併發單位。從生產者和broker層面來說,寫入訊息到不同的分割槽是一種完全的並發行為,...
建立乙個struct,來管理乙個動態增長的陣列
c 程式設計思想,在介紹資料封裝給了乙個cstash的例子,大概的思想是,建立乙個struct,來管理乙個動態增長的陣列。這個陣列可以接受任何型別的基本資料型別。包括示例中的int和char,乙個能儲存多種資料型別的底層資料型別,當然是最小的型別也就是sizeof運算子返回為1的資料型別,綜合考慮,...