kafka生產者API操作

2021-10-10 22:09:24 字數 2305 閱讀 3909

建立producer傳送資訊給消費者

//設定配置相關

private

val prop =

new properties(

) prop.setproperty(

"bootstrap.servers"

,"mypc01:9092,mypc02:9092,mypc03:9092"

) prop.setproperty(

"acks"

,"0"

) prop.setproperty(

"key.serializer"

,"org.apache.kafka.common.serialization.integerserializer"

) prop.setproperty(

"value.serializer"

,"org.apache.kafka.common.serialization.stringserializer"

)//獲取生產者物件

private

val producer =

new kafkaproducer[integer,

string

](prop)

//建立記錄,就是要傳送什麼資訊

private

val message =

new producerrecord[integer,

string](

"pet"

,"you are good"

)//傳送資訊

producer.send(message)

//釋放資源

producer.close()}

private

val prop =

new properties(

)//另一種方式獲取配置資訊,需要把配置檔案放到idea的resource檔案下

prop.load(kafakatest2.getclass.getclassloader.getresourceasstream(

"producer.properties"))

//獲取生產者物件

private

val producer =

new kafkaproducer[integer,

string

](prop)

//獲取記錄

private

val message =

new producerrecord[integer,

string](

"pet"

,"you are rich"

) producer.send(message)

producer.close()}

bootstrap.servers=bigdata01:

9092

,bigdata02:

9092

,bigdata03:

9092 ## kafka的伺服器

key.serializer=org.apache.kafka.common.serialization.integerserializer ##key的序列化器

value.serializer=org.apache.kafka.common.serialization.stringserializer ##value的序列化器

acks=[0

|-1|

1|all] ##訊息確認機制

0: 不做確認,直管傳送訊息即可 -1

|all: 不僅leader需要將資料寫入本地磁碟,並確認,還需要同步的等待其它followers進行確認

1:只需要leader進行訊息確認即可,後期follower可以從leader進行同步

batch.size=

1024 #每個分區內的使用者快取未傳送record記錄的空間大小

如果快取區中的資料,沒有沾滿,也就是任然有未用的空間,那麼也會將請求傳送出去,為了較少請求次數,我們可以配置linger.ms大於0,

linger.ms=

10 ## 不管緩衝區是否被佔滿,延遲10ms傳送request

buffer.memory=

10240 #控制的是乙個producer中的所有的快取空間

retries=

0 #傳送訊息失敗之後的重試次數

public

class

kafkatest3

}

ps

引數linger.ms

緩衝區延遲多少秒後,批量傳送到集群上.

Kafka之生產者

1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...

kafka 生產者(二)

想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...

kafka生產者分割槽策略

kafka生產者 分割槽策略 分割槽的原因 1 方便在集群中擴充套件,每個partition可以通過調整以適應它所在的機器,而乙個topic又 可以有多個partition組成,因此整個集群就可以適應任意大小的資料了 2 可以提高併發,因為可以以partition為單位讀寫了。分割槽的原則 1 指明...