kafka node使用記錄

2021-10-10 12:58:47 字數 2622 閱讀 3397

前言:

專案設計使用kafka實現系統訊息的收發,前端則使用kafka-node來實現,由於之前開發比較倉促,對kafka-node高可用性一直沒有測試,故以此篇文章記錄一下開發及測試過程。

測試環境:軟體

版本kafka

kafka_2.10-0.10.1.1

kafka-node

2.2.3

nodejs

6.11.0

部署方式:

kafka使用三颱虛擬機器進行集群部署,其中在配置kafka服務相關引數時,在server.properties檔案中修改num.partitions=3default.replication.factor=3,以此建立3個partition分割槽以及每個分割槽建立3個副本。

**實現:

生產者

function

kafkasendmessage()

);const producer =

newproducer

(client,);

producer.on(

'ready'

,function()

; producer.

send([

payload]

,function

(err, data)

else})

;});

producer.on(

'error'

,function

(err));

}

備註:第7行**中partitionertype表示使用partition方式,如不配置這項,生產者傳送訊息預設傳送到partition0中,無法達到多個partition負載均衡的效果。

該引數在kafka-node官方有介紹:

引數列舉值

說明使用場景

defaultpartitioner

0預設傳送到partition0

kafka服務中只有乙個partiton

randompartitioner

1隨機傳送到某個partition

每次生產者傳送訊息體中topic數量等於1

cyclicpartitioner

2迴圈傳送到partition(0 1 2)

每次生產者傳送訊息體中topic數量大於1

消費者

function

kafkarecievemessage()

);let topics =

; topics.

push

('testtopic');

let offset =

newoffset

(client)

; offset.

fetchlatestoffsets

(topics,

(event, offsetvalue)

=>

console.

log(topic1)

; topics_partitions.

push

(topic1);}

}let options =

;let consumer =

newconsumer

( client, topics_partitions, options

); consumer.on(

'message'

,function

(message));

});}

備註:第10行**中使用offset獲取topic最新的偏移量及全部partition資料,第19行**輸出topictest的偏移量及partition資訊。

測試:

**

function

test()

,5000);

}

結果

offset

offset

offset

producer

}consumer

consumer

producer

}producer

}consumer

consumer

producer

}consumer

producer

}consumer

producer

}consumer

producer

}consumer

producer

}consumer

producer

}consumer

producer

}

備註:offset為獲取topictest所有partition及offset資料,producer每5秒隨機向某個broker中的所在的partition傳送資料,consumner輸出接受資料。經測試當停到kafka集群中某個節點時,客戶端依然可以正常收發,保證了kafka服務的高可用性。

Python 使用記錄(累計記錄)

2 pandas 設定 3 pandas 操作 4 笛卡爾積 1 列表轉字串list1 department of biology str1 join list1 print str1 department of biology2 從列表中刪除元素 要刪除的元素的型別必須與列表中的元素的型別相同 l...

fiddler使用記錄

fiddler 抓不到ie瀏覽器 或者ie核心瀏覽器 的請求包 1 ie的 去掉,這樣會使用fiddler的 去掉 之後一般就可以抓到ie的包了。很常見!另外,此時基於ie核心的瀏覽器可能還不行,比如我用的115br,瀏覽器關掉重啟下就可以了。a 檢查是否配置了某些filter過濾規則 很常見!b ...

xshell securecrt使用記錄

xshell為何滑鼠雙擊時會觸發換行,當選中一段文字時也會換行?在linux下,ctrl c是發個中斷訊號的意思,ctrl c操作會觸發xshell換行.所以,可能是出發了ctrl c導致了xshell換行.windows下,有一些應用程式定義了一些ctrl c的快捷鍵對映,比如有道詞典 當雙擊選中...