Kafka簡單使用示例

2021-10-04 17:27:19 字數 2997 閱讀 5322

sh bin/zookeeper-server-start.sh config/zookeeper.properties 啟動zookeeper

sh bin/kafka-server-start.sh config/server.properties

properties properties =

newproperties()

; properties.

put(

"bootstrap.servers"

,"127.0.0.1:9092");

properties.

put(

"client.id"

,"demoproducer");

properties.

put(

"acks"

,"0");

properties.

put(

"key.serializer"

,"org.apache.kafka.common.serialization.stringserializer");

properties.

put(

"value.serializer"

,"org.apache.kafka.common.serialization.stringserializer");

kafkaproducer

producer = null;

person person= null;

try}

catch

(exception e)

finally

properties properties =

newproperties()

; properties.

put(

"bootstrap.servers"

,"127.0.0.1:9092");

properties.

put(

"enable.auto.commit"

,"true");

properties.

put(

"auto.commit.interval.ms"

,"1000");

properties.

put(

"session.timeout.ms"

,"30000");

properties.

put(

"key.deserializer"

,"org.apache.kafka.common.serialization.stringdeserializer");

properties.

put(

"value.deserializer"

,"org.apache.kafka.common.serialization.stringdeserializer");

properties.

put(

"group.id"

,"demoproducer");

kafkaconsumer kafkaconsumer =

newkafkaconsumer

<

>

(properties)

; kafkaconsumer.

subscribe

(arrays.

aslist

("message"))

;while

(true

)}

client.id 發出請求時傳遞給伺服器的id字串

acks 訊息持久化方式

enable.auto.commit  是否自動提交 提交後不重複消費

auto.commit.interval.ms 自動提交間隔週期

session.timeout.ms 心跳

group.id 群組

1.乙個生產者多個消費者,怎樣均衡消費?

預設情況下乙個topic只有乙個partitions,同乙個群組下的乙個消費者只能消費乙個partitions,所以預設情況下上面的兩個消費者同時啟動也只有乙個消費者能夠消費到資料。

解決方案:修改partitions kafka/bin下有提供工具

sh kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --topic message --partitions 4

修改以後檢視topic資訊

sh kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic message

topic: message	partitioncount:4	replicationfactor:1	configs:

topic: message partition: 0 leader: 0 replicas: 0 isr: 0

topic: message partition: 1 leader: 0 replicas: 0 isr: 0

topic: message partition: 2 leader: 0 replicas: 0 isr: 0

topic: message partition: 3 leader: 0 replicas: 0 isr: 0

2.改過partitions,發現資料還是只在partition:0 上?

kafka資料分片的規則是 如果生產者指定key 那麼就會獲取key的hash值 與partitioncount 取餘數 就是partition的位置,【注意生產者**new producerrecord(「message」,null, i+"")】

如果key為null,分片規則:尋找上一次儲存資料的partition,如果沒有則直接存在partition0,如果存在就存在下乙個partition,均勻儲存。

kafka集群簡單使用

bin kafka topics.sh create bootstrap server hadoop102 9092 topic testbin kafka console producer.sh broker list hadoop102 9092在本節點建立乙個生產者 bin kafka con...

kafka的簡單使用

mkdir logs broker的全域性唯一編號,不能重複,只能是整型,每個機器都不一樣 broker.id 0 刪除topic功能使用 delete.topic.enable true 處理網路請求的執行緒數量 num.network.threads 3 用來處理磁碟io的現成數量 num.io...

TabHost簡單使用示例

在android的開發中,經常需要實現頁面tab的功能,比較簡答的一種方式就是使用tabhost實現。顧名思義,tabhost即是包含若干個tab的乙個tab容器。那麼,當我們要使用tabhost實現乙個介面功能的時候,我們是如何開始的呢?布局檔案的填寫較為簡單,如下 使用中,需要注意的點是 tab...