kafka基本命令和實踐

2022-06-21 23:27:12 字數 3215 閱讀 4774

#啟動server

./bin/kafka-server-start.sh config/server.properties

#建立topic(主題)test

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic test

#刪除主題

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

#– 注意:如果kafaka啟動時載入的配置檔案中server.properties沒有配置delete.topic.enable=true,那麼此 時的刪除並不是真正的刪除,而是把topic標記為:marked for

deletion

#– 此時你若想真正刪除它,可以登入zookeeper客戶端,進入終端後,刪除相應節點

#檢視主題

./bin/kafka-topics.sh --list --zookeeper localhost:2181

#檢視主題test的詳情

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

#consumer讀訊息

./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic badou --from-beginning

#producer發訊息

#啟動server

./bin/kafka-server-start.sh config/server.properties

#建立topic badou

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic badou

#consumer讀訊息

./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic badou --from-beginning

./bin/flume-ng agent -c conf -f conf/flume_kafka.conf -n a1 -dflume.root.logger=info,console
flume配置檔案flume_kafka.conf

# name the components on this agent

a1.sources =r1

a1.sinks =k1

a1.channels =c1

# describe/configure the source

a1.sources.r1.type =exec

a1.sources.r1.command = tail -f /home/badou/flume_test/flume_exec_test.txt

#a1.sinks.k1.type =logger

# 設定kafka接收器

a1.sinks.k1.type =org.apache.flume.sink.kafka.kafkasink

# 設定kafka的broker位址和埠號

a1.sinks.k1.brokerlist=master:9092

# 設定kafka的topic

a1.sinks.k1.topic=badou

# 設定序列化的方式

a1.sinks.k1.serializer.class=kafka.serializer.stringencoder

# use a channel

which buffers events in

memory

a1.channels.c1.type=memory

a1.channels.c1.capacity = 100000

a1.channels.c1.transactioncapacity = 1000

# bind the source and sink to the channel

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

view code

模擬將後端日誌寫入日誌檔案中

python flume_data_write.py
python**:

#

-*- coding: utf-8 -*-

import

random

import

time

import

pandas as pd

import

json

writefilename="

./flume_exec_test.txt

"cols = ["

order_id

","user_id

","eval_set

","order_number

","order_dow

","hour

","day"]

df1 = pd.read_csv('

/mnt/hgfs/share_folder/00-data/orders.csv')

df1.columns =cols

df =df1.fillna(0)

with open(writefilename,'a+

')as wf:

for idx,row in

df.iterrows():

d ={}

for col in

cols:

d[col]=row[col]

js =json.dumps(d)

wf.write(js+'

\n')

view code

kafka 基本命令

建立topic bin kafka topics.sh create zookeeper localhost 2181 replication factor 1 partitions 1 topic test 檢視所有topic bin kafka topics.sh list zookeeper ...

kafka基本命令使用

本文主要參考 最近開始接觸kafka,下面介紹介紹一些最基本的kafka 的操作。首先是啟動kafka。kafka依賴zookeeper,所以需要先啟動zookeeper。bin zookeeper server start.sh config zookeeper.properties然後啟動kaf...

kafka基本命令(速記)

usr local cellar kafka 1.1.0 1.1.0版本目錄,請檢視自己版本對應的目錄號 usr local etc kafka server.properties usr local etc kafka zookeeper.properties cd usr local cella...