主要內容:1. kafka 安裝、啟動
2. 訊息的 生產、消費
3. 配置啟動集群
4. 集群下的容錯測試
5. 從檔案中匯入資料,並匯出到檔案
tar -xzf kafka_2.10-0.10.1.1.tgzcd kafka_2.10-0.10.1.1
> bin/zookeeper-server-start.sh \config/zookeeper.properties
> bin/kafka-server-start.sh \
config/server.properties
開啟乙個新的終端視窗
bin/kafka-topics.sh --create \--zookeeper localhost:2181\
--replication-factor 1\
--partitions 1\
--topic test
開啟乙個新的終端視窗
bin/kafka-console-producer.sh \--broker-list localhost:9092\
--topic test
進入輸入模式,隨意輸入資訊,例如:
hello worldhi
開啟乙個新的終端視窗
bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092\
--topic test \
--from-beginning
便會顯示出剛才傳送的兩條訊息:
hello worldhi
這時可以開啟傳送訊息的終端視窗,輸入新的資訊,再返回來就可以看到自動接收到了新訊息
> cp config/server.properties \config/server-1.properties
> cp config/server.properties \
config/server-2.properties
修改 config/server-1.properties 的以下幾項配置:
broker.id=1listeners=plaintext:
修改 config/server-2.properties 的以下幾項配置:
broker.id=2listeners=plaintext:
> bin/kafka-server-start.sh \config/server-1.properties &
> bin/kafka-server-start.sh \
config/server-2.properties &
bin/kafka-topics.sh --create \--zookeeper localhost:2181\
--replication-factor 3\
--partitions 1\
--topic my-replicated-topic
bin/kafka-console-producer.sh--broker-list localhost:9092\
--topic my-replicated-topic
輸入訊息:
my test message 1my test message 2
bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092\
--from-beginning \
--topic my-replicated-topic
可以正常取得訊息
# 取得server1的程序號
ps aux | grep server-1.properties
# 殺掉程序
kill -9 43116
讀取訊息
bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092\
--from-beginning \
--topic my-replicated-topic
返回資訊:
my test message 1my test message 2
仍然可以正常取得訊息
kafka 中的 connecter 可以與外部系統進行連線,例如檔案系統、資料庫
下面實驗乙個簡單檔案系統互動,從乙個檔案中匯入資料,然後匯出到另乙個檔案中
echo -e "foo\nbar
" > test.txt
bin/connect-standalone.sh \config/connect-standalone.properties \
config/connect-file-source.properties \
config/connect-file-sink.properties
命令執行後,會輸出一系列的日誌資訊,等待執行完畢
cat test.sink.txt
返回結果:
foobar
成功匯出了 test.txt 中的資料
執行第2步的命令後,為什麼是去讀test.txt
?為什麼寫入了test.sink.txt
?中間的過程是什麼樣的?
原因是在於兩個配置檔案
config/connect-file-source.properties(匯入配置)
name=local-file-sourceconnector.class=filestreamsource
tasks.max=1file=test.txt
topic=connect-test
file
指定了是從test.txt
中匯入資料
topic
指定了把資料傳送到connect-test
這個topic
connect-file-sink.properties(匯出配置)
name=local-file-sinkconnector.class=filestreamsink
tasks.max=1file=test.sink.txt
topics=connect-test
file
指定了把資料匯出到test.txt
中匯入資料
topic
指定從connect-test
這個topic中讀取資料
檢視一下connect-test
這個topic
bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092\
--topic connect-test \
--from-beginning
結果為:
,"payload
":"foo"}
,"payload
":"bar
"}
現在向test.txt中新增一條新資料:
echo "another line
" >> test.txt
再次執行cat test.sink.txt
就會看到剛剛新增的資料:
kafka搭建 快速搭建Kafka服務
搞流處理的話,無論如何是繞不過kafka的了,還好kafka是乙個概念比較好理解的架構模型。我覺得官方的這三張圖已經很好地把模型結構給闡述清楚了。發布 訂閱模型 實現訊息寫入與訊息讀取解耦。kafka相當於是乙個訊息緩衝池 2.日誌檔案順序結構 kafka的高吞吐量就是依賴順序寫入 當然還包括了一些...
自動擋車型,快速起步
以下是文章出處 關注他sport模式 s擋 sport模式 s擋其實許多車都有,原理十分簡單,只需要改變油門的靈敏度和變速箱的換擋積極性就能達到讓加速更加積極的目的。變速箱的降擋積極,你就會覺得加速比較爽快,而如果降擋慢,就會感覺動力憋著出不來,而真正需要動力的多數是在市區穿插或者山路行駛,這種情況...
kafka之快速啟動
topic 主題 是kafka集群中用於儲存某一類 或者某一種資料。主題的資料只能新增 broker 集群中的每乙個分機 都是乙個broker 相當於kafka集群的節點 consumer 消費者 用來從kafka 集群中讀取訊息 producer 生產者 用於從kafka 集群中傳送訊息 stre...