kafka中涉及的名詞
訊息記錄:由乙個key,乙個value和乙個時間戳構成,訊息最終儲存在主題下的分割槽中,記錄在生產中稱為生產者記錄,在消費者中稱為消費記錄。kafka集群保持了所有發布的訊息,直到它們過期,無論訊息是否被消費了,在乙個可配置的時間段內,kafka集群保留了所有發布的訊息。比如訊息的儲存策略被設定為2天,那麼在乙個訊息被發布的兩天時間內,它都是可以被消費的。kafka的效能是和資料量無關的常量級的,所以保留太多資料並不是問題
生成者:生產者用於發布訊息
消費者:消費者用於訂閱訊息
消費者組:相同的groupid的消費者將視為同乙個消費者組,每個消費者都需要設定乙個組id,每條訊息只能被consumer group中的乙個consumer消費,但是可以被多個consumer group消費
主題(topic):訊息的一種邏輯分組,用於對訊息分門別類,每一類訊息稱之為乙個主題,相同主題的訊息放在乙個佇列中
分割槽(partition):訊息的一種物理分組,乙個主題被拆成多個分割槽,每乙個分割槽就是乙個順序的,不可變的訊息佇列,並且可以持續新增,分割槽中的每個訊息都被分配了乙個唯一的id,稱之為偏移量(offset),在每個分割槽中偏移量都是唯一的。每個分割槽對應乙個邏輯log,有多個segment組成
偏移量:分割槽中每個訊息都有乙個唯一的id,稱之為偏移量,代表已經消費的位置
**(broker):一台kafka伺服器稱之為乙個broker
副本(replica):副本只是乙個分割槽(partition)的備份。副本不讀取或寫入資料。它們用於防止資料丟失
領導者:leader是負責給定分割槽的所有讀取和寫入的節點
追隨者:跟隨領導者指令的節點被稱為follower。
zookeeper:kafka**是無狀態的,所以它們使用zookeeper來維護它們的集群狀態。zookeeper用於管理和協調kafka**
kafka功能
一. mac版安裝
brew install kafka
安裝kafka需要依賴zookeeper的,所以安裝kafka的時候也會包含zooker
server.properties中重要配置二. 啟動zookeeperbroker.id=0
listeners=plaintext://:9092
advertised.listeners=plaintext:
log.dirs=/usr/local/var/lib/kafka-logs
zookeeper.properties重要配置
datadir=/usr/local/var/lib/zookeeper
clientport=2181
maxclientcnxns=0
新建立終端啟動zookeeper三.啟動kafkacd /usr/local/cellar/kafka/2.1.0
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
列印台顯示:info reading configuration from: /usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.quorumpeerconfig)
...即是啟動成功
新建立終端啟動kafka(啟動kafka之前必須先啟動zookeeper)四.建立topiccd /usr/local/cellar/kafka/2.1.0
./bin/kafka-server-start /usr/local/etc/kafka/server.properties
列印台顯示:info registered kafka:type=kafka.log4jcontroller mbean (kafka.utils.log4jcontrollerregistration$)
...即啟動成功
啟動了kafka之後,zookeeper端會報一些error:keepererrorcode = nonode for /config/topics/test之類的錯誤,這個是沒有問題的,這是因為kafka向zookeeper傳送了關於該路徑的一些請求資訊,但是不存在,所以這是沒有問題的
新建立終端五.傳送訊息cd /usr/local/cellar/kafka/2.1.0
建立乙個名為「test」的主題:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
檢視所有的topic:./bin/kafka-topics --list --zookeeper localhost:2181
檢視某個topic的資訊,比如test:./bin/kafka-topics --describe --zookeeper localhost:2181 --topic test
新建立乙個終端,作為生產者,用於傳送訊息,每一行就是一條資訊,將訊息傳送到kafka伺服器六.消費訊息(接受訊息)cd /usr/local/cellar/kafka/2.1.0
./bin/kafka-console-producer --broker-list localhost:9092 --topic test
send one message
send two message
新建立乙個終端作為消費者,接受訊息注意:傳送訊息與接受訊息必須啟動kafka與zookeeper生產者cd /usr/local/cellar/kafka/2.1.0
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
send one message
send two message(這些便是從生產者獲得的訊息)
import (
//構建傳送的訊息,
msg := &sarama.producermessage
var value string
var msgtype string
for
fmt.scanf("%s",&msgtype)
fmt.println("msgtype = ",msgtype,",value = ",value)
msg.topic = msgtype
//將字串轉換為位元組陣列
msg.value = sarama.byteencoder(value)
//sendmessage:該方法是生產者生產給定的訊息
//生產成功的時候返回該訊息的分割槽和所在的偏移量
//生產失敗的時候返回error
partition, offset, err := producer.sendmessage(msg)
if err != nil
fmt.printf("partition = %d, offset=%d\n", partition, offset)
}}
消費者import (
//partitions(topic):該方法返回了該topic的所有分割槽id
partitionlist, err := consumer.partitions("test")
if err != nil
for partition := range partitionlist
defer pc.asyncclose()
wg.add(1)
go func(sarama.partitionconsumer)
}(pc)
} wg.wait()
consumer.close()
}
流量削峰在訊息佇列中也是常用場景,一般在秒殺或**活動中使用比較廣泛。當流量太大的時候達到伺服器瓶頸的時候可以將事件放在kafka中,下游伺服器當接收到訊息的時候自己去消費,有效防止伺服器被擠垮
訊息佇列一般都內建了高效的通訊機制,因此也可以用在純的訊息通訊中,比如客戶端a跟客戶端b都使用同一佇列進行訊息通訊,客戶端a,客戶端b,客戶端n都訂閱了同乙個主題進行訊息發布和接受不了實現類似聊天室效果
參考**
如何基於sqlite實現kafka延時訊息詳解
目錄 延時訊息 或者說定時訊息 是業務系統裡乙個常見的功能點。常用業務場景如 1 訂單超時取消 2 離線超過指定時間的使用者,召回通知 3 手機消失多久後通知監護人 現流行的實現方案主要有 1 資料庫定時輪詢,掃瞄到達到延時時間的記錄,業務處理,刪除該記錄 2 jdk 自帶延時佇列 delayque...
kafka 檢視待消費資料 kafka檢視消費資料
kafka檢視消費資料 一 如何檢視 在老版本中,使用kafka run class.sh 指令碼進行檢視。但是對於最新版本,kafka run class.sh 已經不能使用,必須使用另外乙個指令碼才行,它就是kafka consumer groups.sh 普通版檢視所有組 要想查詢消費資料,必...
kafka 檢視待消費資料 kafka檢視消費資料
一 如何檢視 在老版本中,使用kafka run class.sh 指令碼進行檢視。但是對於最新版本,kafka run class.sh 已經不能使用,必須使用另外乙個指令碼才行,它就是kafka consumer groups.sh 普通版檢視所有組 要想查詢消費資料,必須要指定組。那麼線上執行...