通過kafka傳送和接收訊息

2021-09-24 09:31:28 字數 2447 閱讀 1274

生產者配置類:

@configuration

@enablekafka

public class kafkaproducerconfig ")

private string address;

@value("$")

private string batchsize;

@value("$")

private string linger;

public mapproducerconfigs()

public produce***ctoryproduce***ctory()

@bean

public kafkatemplatekafkatemplate()

}

消費者配置類:

@component

public class kafkaconsumerconfig ")

private string address;

@value("$")

private string batchsize;

@value("$")

private string linger;

@value("$")

private string concurrency;

@value("$")

private string autocommit;

@value("$")

private string timeout;

@value("$")

private string groupid;

@value("$")

private string offsetreset;

@bean

public kafkalistenercontaine***ctory> kafkalistenercontaine***ctory()

public consume***ctoryconsume***ctory()

public mapconsumerconfigs()

@bean

public rawdatalistener listener()

}

傳送訊息:

@service("kafkaconfig")

public class kafkaproducer ")

private string topic;

@autowired

private kafkatemplatekafkatemplate;

//傳送訊息方法

public void send(string message)

}

接收訊息:

/**

* kafka監聽 * @author king-mouse *

*/@component

public class rawdatalistener ")

private string topic;

//工具類中呼叫service的方法

@resource(name = "flowdataserviceimpl")

private flowdataserviceimpl flowdataservice;

private static final logger logger = logge***ctory.getlogger(rawdatalistener.class);

/*** 實時獲取kafka資料(生產一條,監聽生產topic自動消費一條) * @param record * @throws ioexception

*/@kafkalistener(topics = "})

public void listen(consumerrecord<?, ?> record) throws ioexception

}

配置檔案:

######kafka配置資訊######

kafkaconfig:

address: 127.0.0.1:9002,127.0.0.2:9003

#(是否自動提交)

autocommit: true

#(連線超時時間)

timeout: 20000

commitinterval: 100

#(消費組)

groupid: ivrkafka

#(實時生產,實時消費,不會從頭開始消費)

offsetreset: latest

#(設定消費執行緒數)

concurrency: 15

#消費的topic

topic: ivrflowupdate

batchsize: 4096

linger: 1

memory: 40960

kafka 訊息傳送和接收

傳送 例項 public class kafkaproducerdemo extends thread override public void run else catch interruptedexception e catch executionexception e num try catc...

go 實現 kafka 訊息傳送 接收

kafka是訊息中介軟體的一種,是一種分布式流平台,是用於構建實時資料管道和流應用程式。具有橫向擴充套件,容錯,wicked fast 快 等優點。生產者 producer 將訊息記錄 record 傳送到kafka中的主題中 topic 乙個主題可以有多個分割槽 partition 訊息最終儲存在...

通過pykafka接收Kafka訊息佇列的方法

沒有kafka環境,所以也沒有進行驗證。感覺今後應該能用到,所以借抄在此,備查。pykafka使用示例,自動消費最新訊息,不重複消費 coding utf8 frwww.cppcns.comom pykafka impowww.cppcns.comrwww.cppcns.comt kafkaclie...