生產者配置類:
@configuration
@enablekafka
public class kafkaproducerconfig ")
private string address;
@value("$")
private string batchsize;
@value("$")
private string linger;
public mapproducerconfigs()
public produce***ctoryproduce***ctory()
@bean
public kafkatemplatekafkatemplate()
}
消費者配置類:
@component
public class kafkaconsumerconfig ")
private string address;
@value("$")
private string batchsize;
@value("$")
private string linger;
@value("$")
private string concurrency;
@value("$")
private string autocommit;
@value("$")
private string timeout;
@value("$")
private string groupid;
@value("$")
private string offsetreset;
@bean
public kafkalistenercontaine***ctory> kafkalistenercontaine***ctory()
public consume***ctoryconsume***ctory()
public mapconsumerconfigs()
@bean
public rawdatalistener listener()
}
傳送訊息:
@service("kafkaconfig")
public class kafkaproducer ")
private string topic;
@autowired
private kafkatemplatekafkatemplate;
//傳送訊息方法
public void send(string message)
}
接收訊息:
/**
* kafka監聽 * @author king-mouse *
*/@component
public class rawdatalistener ")
private string topic;
//工具類中呼叫service的方法
@resource(name = "flowdataserviceimpl")
private flowdataserviceimpl flowdataservice;
private static final logger logger = logge***ctory.getlogger(rawdatalistener.class);
/*** 實時獲取kafka資料(生產一條,監聽生產topic自動消費一條) * @param record * @throws ioexception
*/@kafkalistener(topics = "})
public void listen(consumerrecord<?, ?> record) throws ioexception
}
配置檔案:
######kafka配置資訊######
kafkaconfig:
address: 127.0.0.1:9002,127.0.0.2:9003
#(是否自動提交)
autocommit: true
#(連線超時時間)
timeout: 20000
commitinterval: 100
#(消費組)
groupid: ivrkafka
#(實時生產,實時消費,不會從頭開始消費)
offsetreset: latest
#(設定消費執行緒數)
concurrency: 15
#消費的topic
topic: ivrflowupdate
batchsize: 4096
linger: 1
memory: 40960
kafka 訊息傳送和接收
傳送 例項 public class kafkaproducerdemo extends thread override public void run else catch interruptedexception e catch executionexception e num try catc...
go 實現 kafka 訊息傳送 接收
kafka是訊息中介軟體的一種,是一種分布式流平台,是用於構建實時資料管道和流應用程式。具有橫向擴充套件,容錯,wicked fast 快 等優點。生產者 producer 將訊息記錄 record 傳送到kafka中的主題中 topic 乙個主題可以有多個分割槽 partition 訊息最終儲存在...
通過pykafka接收Kafka訊息佇列的方法
沒有kafka環境,所以也沒有進行驗證。感覺今後應該能用到,所以借抄在此,備查。pykafka使用示例,自動消費最新訊息,不重複消費 coding utf8 frwww.cppcns.comom pykafka impowww.cppcns.comrwww.cppcns.comt kafkaclie...