1.pom.xml中加入kafka依賴:
org.springframework.kafka
spring-kafka
## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.stringserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.stringserializer
spring.kafka.consumer.group-id=test
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.stringdeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.stringdeserializer
spring.kafka.topic=mqtt_location_data
3.kafka訊息生產者:
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.scheduling.annotation.enablescheduling;
import org.springframework.scheduling.annotation.scheduled;
import org.springframework.stereotype.component;
import org.springframework.util.concurrent.listenablefuture;
/** * @description: kafka生產者
* @author:
* @create: 2019-01-29 17:38
**/@component
@slf4j
public class kafkaproducer ")
private string topic;
/*** 傳送kafka訊息
** @param jsonstring
*/public void send(string jsonstring)
}
4.kafka訊息消費者:
import lombok.extern.slf4j.slf4j;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.component;
/** * @description: kafka消費者
* @author:
* @create: 2019-01-29 17:47
**/@component
@slf4j
public class kafkaconsumer ")
public void listen(consumerrecord, ?> record) , offset={}, message={}", record.topic(), record.offset(), record.value());}}
5.啟動類測試:
import kafka.producer.kafkaproducer;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
/** * description:
* author:
* date: 2018-09-30 09:15
**/public static void main(string args)
@autowired
kafkaproducer kafkaproducer;
@override
public void run(string... args) throws exception }}
結果:
配置引數補充:
#acks = 0 如果設定為零,則生產者將不會等待來自伺服器的任何確認,該記錄將立即新增到套接字緩衝區並視為已傳送。在這種情況下,無法保證伺服器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設定為-1。
#acks = 1 這意味著leader會將記錄寫入其本地日誌,但無需等待所有副本伺服器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄後立即失敗,但在將資料複製到所有的副本伺服器之前,則記錄將會丟失。
#acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少乙個同步副本伺服器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設定。
#可以設定的值為:all, -1, 0, 1
spring.kafka.producer.acks=1
#該值大於零時,表示啟用重試失敗的傳送次數
spring.kafka.producer.retries=3
CMake簡易入門
首發於fxm5547的部落格 cmake minimum required version 2.6 project itest c 標準 set cmake cxx standard 11 指定參與編譯的原始檔 add executable itest src main.cpp src cal ca...
MySQL簡易入門
mysql 是乙個網際網路繞不過去的坎,總覺得很簡單,一切似乎都圍繞著 curd,但是不能脫離這個核心,本次的部落格其 mysql 的一些基本概念作為主題,力求用自己的語言,將其中的概念說清楚。今天是在銀川的第二天,第一次坐完飛機,現在在見家長的過程中,偷得片刻悠閒,寫點東西作為總結 mysql 可...
Python簡易入門
字串可用單引號 雙引號和三引號。轉義字元 可以轉義很多字元,比如 n表示換行,t表示製表符,b表示退格符,字元 本身也需要轉義,因此,表示的字元就是 取子字串有兩種方法,一種是用 索引,一種是用切片運演算法 檢視變數型別 type 型別轉換 列表 list 元組 tuple 集合 set 字典 di...