前面博文嘗試使用了srping cloudbus, 裡面引入了spring-cloud-starter-bus-kafka和spring-cloud-starter-bus-amqp,實時上它們分別依賴了spring-cloud-starter-stream-kafka和spring-cloud-starter-stream –rabbit,真正實現與這些訊息**進行互動操作的是spring cloud stream。一定程度上我們將spring cloud bus理解為使用了spring cloud stream構建的上層應用。
各種各樣的訊息佇列的產生和更新,使mq元件學習成本越來越高,string cloud stream為一些**商的訊息中介軟體產品提供了個性化的自動化配置,引入發布訂閱、消費組、以及分割槽這3個概念,有效的簡化了上層研發人員對mq使用的複雜度,讓開發人員更多的精力投入到核心業務的處理。
spring cloud stream裡定義了了3種通道:
sink:只有乙個輸入的通道;
public
inte***ce sink
source:只有乙個輸出的通道;
public
inte***ce source
processor:乙個雙向的通道。
public
inte***ce
processor
extends
source, sink
示例**一:
spring:
rabbitmq:
host: 192.168
.226
.133
port: 5672
username: guest
password: guest
server:
port: 8001
核心**:
@enablebinding(processor.class)
public
class
private
@streamlistener(processor.input)
@sendto(processor.output)
public string receivemethod(device device)
}
核心的三個註解:
@enablebinding(processor.class):該註解用來指定乙個或多個定義了@input或@output註解的介面,這裡構建乙個processor通道。
@streamlistener(processor.input):定義在方法中,被修飾的方法註冊為訊息中介軟體上資料流的事件***,註解中屬性值對應了監聽的訊息通道名,這裡在input中提取訊息
@sendto(processor.output)在output中返回訊息。
receivemethod是核心方法,入參直接是device型別,因為streamlistener註解自帶協議轉換的能力,程式不需要新增任何配置。因為我們要返回內容給output通道,所以該方法要有返回值。
spring.cloud
.stream
.bindings
.input
.destination=output
spring.cloud
.stream
.bindings
.output
.destination=input
spring.cloud
.stream
.bindings
.output
spring.rabbitmq
.host=192.168
.226
.133
spring.rabbitmq
.port=5672
spring.rabbitmq
.username=guest
spring.rabbitmq
.password=guest
server.port=8002
@enablebinding(processor.class)
public
class
private
private
int id;
@bean
@inboundchanneladapter(value=processor.output, poller=@poller(fixeddelay="5000"))
public messagesourcesendtimemessage()
@streamlistener(processor.input)
public
void
receivemethod(object message)
}
spring.cloud.stream.bindings.input.destination=output
spring.cloud.stream.bindings.output.destination=input
研究到這裡可以發現,什麼sink、source、processor只不過就是spring cloud stream內建了3種已經準備好的訊息通道而已,通道的關鍵還是在於跟mq投遞和消費訊息的通道名稱。為此我們自己定義一種只有監聽的通道(也就是sink)來試試。
示例**二:
**位址(
自定義sink:
public
inte***ce myselfsink
注意我們的通道名是myinput
通道監聽:
@enablebinding(myselfsink.class)
public
class
myselfsinkreceiver
}
為了配合測試我們新增乙個訊息投遞的生產方式:
@restcontroller
public
class
sendercontroller
}
只要通過get請求/mysend,controller就會給myinput通道投遞資訊,streamlinstener就可以監聽到並消費這個資訊。
真正生產中我們盡量使用spring自帶的sink、source、processor,一般不會自己再去開發通道類,但是又要支援很多channel怎麼辦,採用的方式就是配置指定目的通道名的辦法。
spring.cloud
.stream
.bindings
.input
.destination=my-channel-a
spring.cloud
.stream
.bindings
.output
.destination=my-channel-b
spring.cloud
.stream
.bindings
.input
.destination=my-channel-b
spring.cloud
.stream
.bindings
.output
.destination=my-channel-a
這樣開發**只需要關心自己通道的型別選用sink、source、processor裡的一種,至於具體如何跟mq對映的,交給上面這個配置。 Spring Cloud Stream使用細節
上篇文章我們看了spring cloud stream的基本使用,小夥伴們對spring cloud stream應該也有了乙個基本的了解,但是上篇文章中的訊息我們是從rabbitmq的web管理頁面發來的,如果我們想要從 中傳送訊息呢?本文我們就來看看spring cloud stream的一些使...
spring cloud stream基本使用
spring cloud stream 通過定義繫結器作為中間層,完美地實現了應用程式與訊息中介軟體細節之間的隔離。通過向應用程式暴露統一的channel通道,使得應用程式不需要再考慮各種不同的訊息中介軟體的實現 spring cloud stream 中的訊息通訊方式遵循了發布 訂閱模式,當一條訊...
SpringCloud stream 訊息分割槽
1.stream的分割槽是當訊息的提供者傳送了相同的訊息的時候,如果被集群的中的某個節點消費了那麼如果提供者在此傳送相同的訊息的時候 一致會被同乙個的消費者消費掉 分割槽的配置 提供者端需要配置的資訊 新增rabbitmq資訊 spring.rabbitmq.host 192.168.177.140...