在這個部分我們要將kafka分布式流**平台接入我們的系統,用作系統通知。
zookeeper啟動命令
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
kafka啟動命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
# kafkapropertoes
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
# 是否自動提交
spring.kafka.consumer.enable-auto-commit=true
# 自動提交的頻率(ms)
spring.kafka.consumer.auto-commit-interval=3000
public
class
event
public event settopic
(string topic)
public
intgetuserid()
public event setuserid
(int userid)
public
intgetentitytype()
public event setentitytype
(int entitytype)
public
intgetentityid()
public event setentityid
(int entityid)
public
intgetentityuserid()
public event setentityuserid
(int entityuserid)
public map
getdata()
public event setdata
(string key,object value)
}
@component
public
class
eventproducer
}
在springboot中我們使用@kafkalistener來實現對訊息佇列的監聽,當收到訊息時就會觸發相應的方法,我們使用consumerrecord類的例項來接收訊息佇列中的訊息。
在我們的業務中,我們首先判斷是否由收到訊息,然後把收到的訊息字串轉換為event物件,然後我們將該訊息經過處理儲存到message資料庫中。
在message資料表中,我們設定訊息的傳送者id為系統id,訊息的接收者id為event的entityuserid,訊息的會話id我們可以設定為事件的主題,而訊息的內容我們可以用乙個map來儲存,需要放入event的userid,entitytype和entityid,如果data中有資料我們也要放入其中。然後把map轉成json格式的字串放入資料庫中。
@component
public
class
eventconsumer
implements
communityconstant
)public
void
handlecommentmessage
(consumerrecord record)
event event = jsonobject.
parseobject
(record.
value()
.tostring()
, event.
class);
if(event == null)
// 傳送站內通知
message message =
newmessage()
; message.
setfromid
(system_user_id)
; message.
settoid
(event.
getentityuserid()
);message.
setconversationid
(event.
gettopic()
);message.
setcreatetime
(new
date()
);map
content =
newhashmap
<
>()
; content.
put(
"userid"
,event.
getuserid()
);content.
put(
"entitytype"
,event.
getentitytype()
);content.
put(
"entityid"
,event.
getentityid()
);if(
!event.
getdata()
.isempty()
)}message.
setcontent
(jsonobject.
tojsonstring
(content));
messageservice.
addmessage
(message);}
}
event event =
newevent()
.settopic
(topic_comment)
.setuserid
(hostholder.
getuser()
.getid()
).setentitytype
(comment.
getentitytype()
).setentityid
(comment.
getentityid()
).setdata
("postid"
,discusspostid);if
(comment.
getentitytype()
== entity_type_post)
else
if(comment.
getentitytype()
== entity_type_comment)
eventproducer.
fireevent
(event)
;
@component
public
class
messageinterceptor
implements
handlerinterceptor
}}
kafka學習總結之kafka核心
1 kafka核心元件 1 replication 副本 partition 分割槽 乙個topic可以有多個副本,副本的數量決定了有多少個broker存放寫入的資料 副本是以partition為單位的,存放副本即是備份若干個partition,但是只有乙個partition被選為leader用於讀...
Kafka技術知識總結之五 Kafka的高可用性
接上篇 kafka技術知識總結之四 kafka 再均衡 kafka 實現高可用性的方式是進行 replication。對於 kafka,如果沒有提供高可用性機制,一旦乙個或多個 broker 宕機,則宕機期間其上所有 partition 都無法繼續提供服務。若該 broker永遠不能再恢復,那麼所有...
分布式訊息系統之Kafka
隨著疫情在國內得到了平穩控制,各個企業也逐漸開始招聘了,而跳槽的好時機,除了金三銀四之外,便是金九銀十啦。準備看機會的朋友們可以抓住這次機會,好好的準備簡歷,覆盤專案經歷,深入惡補每個知識點。今天我們要給大家補的知識點便是分布式訊息系統kafka。在網際網路海量資料 高併發 高可用 低延遲的要求下,...