#啟動server./bin/kafka-server-start.sh config/server.properties
#建立topic(主題)test
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic test
#刪除主題
./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
#– 注意:如果kafaka啟動時載入的配置檔案中server.properties沒有配置delete.topic.enable=true,那麼此 時的刪除並不是真正的刪除,而是把topic標記為:marked for
deletion
#– 此時你若想真正刪除它,可以登入zookeeper客戶端,進入終端後,刪除相應節點
#檢視主題
./bin/kafka-topics.sh --list --zookeeper localhost:2181
#檢視主題test的詳情
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
#consumer讀訊息
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic badou --from-beginning
#producer發訊息
#啟動server./bin/kafka-server-start.sh config/server.properties
#建立topic badou
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic badou
#consumer讀訊息
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic badou --from-beginning
./bin/flume-ng agent -c conf -f conf/flume_kafka.conf -n a1 -dflume.root.logger=info,consoleflume配置檔案flume_kafka.conf# name the components on this agentview codea1.sources =r1
a1.sinks =k1
a1.channels =c1
# describe/configure the source
a1.sources.r1.type =exec
a1.sources.r1.command = tail -f /home/badou/flume_test/flume_exec_test.txt
#a1.sinks.k1.type =logger
# 設定kafka接收器
a1.sinks.k1.type =org.apache.flume.sink.kafka.kafkasink
# 設定kafka的broker位址和埠號
a1.sinks.k1.brokerlist=master:9092
# 設定kafka的topic
a1.sinks.k1.topic=badou
# 設定序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.stringencoder
# use a channel
which buffers events in
memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactioncapacity = 1000
# bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
模擬將後端日誌寫入日誌檔案中
python flume_data_write.pypython**:#view code-*- coding: utf-8 -*-
import
random
import
time
import
pandas as pd
import
json
writefilename="
./flume_exec_test.txt
"cols = ["
order_id
","user_id
","eval_set
","order_number
","order_dow
","hour
","day"]
df1 = pd.read_csv('
/mnt/hgfs/share_folder/00-data/orders.csv')
df1.columns =cols
df =df1.fillna(0)
with open(writefilename,'a+
')as wf:
for idx,row in
df.iterrows():
d ={}
for col in
cols:
d[col]=row[col]
js =json.dumps(d)
wf.write(js+'
\n')
kafka 基本命令
建立topic bin kafka topics.sh create zookeeper localhost 2181 replication factor 1 partitions 1 topic test 檢視所有topic bin kafka topics.sh list zookeeper ...
kafka基本命令使用
本文主要參考 最近開始接觸kafka,下面介紹介紹一些最基本的kafka 的操作。首先是啟動kafka。kafka依賴zookeeper,所以需要先啟動zookeeper。bin zookeeper server start.sh config zookeeper.properties然後啟動kaf...
kafka基本命令(速記)
usr local cellar kafka 1.1.0 1.1.0版本目錄,請檢視自己版本對應的目錄號 usr local etc kafka server.properties usr local etc kafka zookeeper.properties cd usr local cella...