建立producer傳送資訊給消費者
//設定配置相關
private
val prop =
new properties(
) prop.setproperty(
"bootstrap.servers"
,"mypc01:9092,mypc02:9092,mypc03:9092"
) prop.setproperty(
"acks"
,"0"
) prop.setproperty(
"key.serializer"
,"org.apache.kafka.common.serialization.integerserializer"
) prop.setproperty(
"value.serializer"
,"org.apache.kafka.common.serialization.stringserializer"
)//獲取生產者物件
private
val producer =
new kafkaproducer[integer,
string
](prop)
//建立記錄,就是要傳送什麼資訊
private
val message =
new producerrecord[integer,
string](
"pet"
,"you are good"
)//傳送資訊
producer.send(message)
//釋放資源
producer.close()}
private
val prop =
new properties(
)//另一種方式獲取配置資訊,需要把配置檔案放到idea的resource檔案下
prop.load(kafakatest2.getclass.getclassloader.getresourceasstream(
"producer.properties"))
//獲取生產者物件
private
val producer =
new kafkaproducer[integer,
string
](prop)
//獲取記錄
private
val message =
new producerrecord[integer,
string](
"pet"
,"you are rich"
) producer.send(message)
producer.close()}
bootstrap.servers=bigdata01:
9092
,bigdata02:
9092
,bigdata03:
9092 ## kafka的伺服器
key.serializer=org.apache.kafka.common.serialization.integerserializer ##key的序列化器
value.serializer=org.apache.kafka.common.serialization.stringserializer ##value的序列化器
acks=[0
|-1|
1|all] ##訊息確認機制
0: 不做確認,直管傳送訊息即可 -1
|all: 不僅leader需要將資料寫入本地磁碟,並確認,還需要同步的等待其它followers進行確認
1:只需要leader進行訊息確認即可,後期follower可以從leader進行同步
batch.size=
1024 #每個分區內的使用者快取未傳送record記錄的空間大小
如果快取區中的資料,沒有沾滿,也就是任然有未用的空間,那麼也會將請求傳送出去,為了較少請求次數,我們可以配置linger.ms大於0,
linger.ms=
10 ## 不管緩衝區是否被佔滿,延遲10ms傳送request
buffer.memory=
10240 #控制的是乙個producer中的所有的快取空間
retries=
0 #傳送訊息失敗之後的重試次數
public
class
kafkatest3
}
ps
引數linger.ms
緩衝區延遲多少秒後,批量傳送到集群上.
Kafka之生產者
1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...
kafka 生產者(二)
想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...
kafka生產者分割槽策略
kafka生產者 分割槽策略 分割槽的原因 1 方便在集群中擴充套件,每個partition可以通過調整以適應它所在的機器,而乙個topic又 可以有多個partition組成,因此整個集群就可以適應任意大小的資料了 2 可以提高併發,因為可以以partition為單位讀寫了。分割槽的原則 1 指明...