reactor 3 程式設計筆記二

2021-10-19 11:50:07 字數 3864 閱讀 8769

【參考資料】

1 reactor 核心類

1.1 publisher

public

inte***ce

publisher

publisher 代表訊息的傳送方

subscribe 傳入乙個subscriber,建立乙個生產和消費的連線,即subscription

publisher可以有多個subscriber

flux\mono 都是繼承至publisher

subscriber 表示訊息的接收者

當 publisher 和 subscriber 建立連線 onsubscribe 觸發

當 subscription的request方法呼叫,onnext根據請求訊息數量被一次或多次呼叫

當 消費出錯或完成時 onerror 和 oncomplete

以 basesubscriber 為例:

在 basesubscriber 背壓的例子中,hookonnext 方法在其 onnext 呼叫

subscription 代表 publisher 和 subscriber 建立的乙個連線

在onsubscribe時會賦值乙個subscription

這個subscription會被儲存在subscriber中

1、包含 publisher 和 subscriber ,屬於一種處理過程

directprocessor

directprocessor = directprocessor.

create()

;flux

flux = directprocessor

.filter

(e -

> e %2==

0);flux.

subscribe

(system.err:

:println)

;flux.

subscribe

(system.err:

:println)

;intstream.

range(1

,10).

foreach

(e -

>

);

輸出:2 2 4 4 6 6 8 8

注意這裡有兩個subscribe

2 主要 processor 使用

2.1 基本分類 分類

描述直接的(direct)

directprocessor 和 unicastprocessor,直接使用sink來獲取資料

同步的(synchronous)

emitterprocessor 和 replayprocessor,除sink外可以定義其他publisher

非同步的(asynchronous)

topicprocessor及workqueueprocessor,除sink外可以定義其他publisher

2.2 unicastprocessor

支援背壓

這裡重新回到 1.4的 directprocessor,如果flux.subscribe在 directprocessor.onnext(e) 之後則無法

取到資料。

只能有乙個subscriber

unicastprocessor

unicastprocessor

= unicastprocessor.

create()

;fluxsink

sink = unicastprocessor.

sink

(fluxsink.overflowstrategy.buffer)

;flux

flux = unicastprocessor

.filter

(e -

> e %2==

0).doonerror

(system.err:

:println)

;intstream.

range(1

,20).

foreach

(e -

>);

thread.

sleep

(1000);

flux.

subscribe

(system.err:

:println)

;unicastprocessor.

oncomplete()

;

此時正常輸出:2 4 6 8 … 18

2.3 emitterprocessor

1、向多個訂閱者傳送資料,可對每乙個訂閱者進行背壓

emitterprocessor

emitterprocessor

= emitterprocessor.

create(4

);flux

flux1 = emitterprocessor.

filter

(e -

> e %2==

0);flux

flux2 = emitterprocessor.

filter

(e -

> e %3==

0);system.err.

println

("start subscribe ");

flux1.

subscribe

(system.err:

:println)

;intstream.

rangeclosed(1

,10).

foreach

(e -

>

catch

(interruptedexception e1)})

;flux2.

subscribe

(system.err:

:println)

;//flux2 沒有消費1~10,只能消費後面的3

emitterprocessor.

onnext(3

);emitterprocessor.

oncomplete()

;

輸出:

Reactor 3 參考文件 如何選擇操作符

如果乙個操作符是專屬於flux或mono的,那麼會給它註明字首。公共的操作符沒有字首。如果乙個具體的用例涉及多個操作符的組合,這裡以方法呼叫的方式展現,會以乙個點 開頭,並將引數置於圓括號內,比如 methodcall parameter 原文出處 參考 1 建立乙個新序列,它.發出乙個t,且還是由...

《OpenCV3程式設計入門》筆記二

採用hsv和hls把顏色分解成色調 飽和度和亮度 明度。這是描述顏色更自然,比如可以通過拋棄最後乙個元素,使演算法對輸入影象的光照條件不敏感。另一種使演算法對輸入影象的光照條件不敏感的方法是採用動態範圍較大的相機。錯誤位置 2.方法二 在c c 中通過建構函式進行初始化 int sz 3 mat l...

程式設計筆記二

題目 輸入某二叉樹的前序遍歷和中序遍歷的結果,請重建出該二叉樹。假設輸入的前序遍歷和中序遍歷的結果中都不含重複的數字。例如輸入前序遍歷序列和中序遍歷序列,則重建二叉樹並返回。時間限制 c c 1秒,其他語言2秒 空間限制 c c 32m,其他語言64m class solution 題目 用兩個棧來...