rxjs 中的排程器 ( schedulers ) 是用來控制事件發出的順序和速度的(傳送給觀察者的)。它還可以控制訂閱 ( subscriptions ) 的順序。為了不搞得太理論化,先考慮下這個示例:
const a$ = rx.observable.of(1, 2);
const b$ = rx.observable.of(10);
const c$ = rx.observable.combinelatest(a$, b$, (a, b) => a + b);
c$.subscribe(c =>
console.log(c));複製**
你覺得控制台輸出的結果是什麼呢?大多數人會猜是:
11
12複製**
因為首先a$
中1
會和b$
中的10
配對,然後a$
中的2
和b$
中的10
配對。
事實上,出現在控制台中的是:
12複製**
1 + 10
的組合並沒有發生。原因是 observablesa$
和b$
都是「同步的」,它們會盡可能快地執行。那麼事件發出的順序到底是怎樣的呢?答案是不確定的,它可能是以下任意一種:
在這種順序不確定的情況下,我們應該描述出事件的發出順序是怎樣的。這就是排程器所做的事。預設情況下,rxjs 使用所謂的遞迴排程器。下面是它的工作原理:
c$ 被訂閱
combinelatest 的第乙個輸入流 a$ 被訂閱
a$ 發出值 1
combinelatest 將 1 作為 a$ 的最新值進行儲存
a$ 發出值 2
combinelatest 將 2 作為 a$ 的最新值進行儲存
combinelatest 的第二個輸入流 b$ 被訂閱
b$ 發出值 10
combinelatest 將 10 作為 b$ 的最新值進行儲存
combinelatest 現在同時擁有了 a$ 和 b$ 的值,因此它發出值 2 + 10
發出的順序為1, 2, 10
。最有意思的是在b$
被訂閱前, 將a$
的所有事件都盡快地發出了。rxjs 使用這種排程器作為預設排程器出於兩點原因:
然而,可以通過使用不同的排程器來自定義事件發出的順序及速度。我們在a$
上使用asap
排程器來讓其「慢下來」:
// const a$ = rx.observable.of(1, 2);
const a$ = rx.observable.from([1, 2], rx.scheduler.asap); // 新**
const b$ = rx.observable.of(10);
const c$ = rx.observable.combinelatest(a$, b$, (a, b) => a + b);
c$.subscribe(c =>
console.log(c))複製**
from
操作符的第二個引數是排程器,用來自定義事件的發出。asap
排程器使用 setimmediate 來安排任務盡快執行,但不是同步的。**改變後,控制台會輸出:
11
12複製**
因為內部執行順序如下:
c$ 被訂閱
combinelatest 的第乙個輸入流 a$ 被訂閱
combinelatest 的第二個輸入流 b$ 被訂閱b$ 發出值 10
combinelatest 將 10 作為 b$ 的最新值進行儲存
a$ 發出值 1
combinelatest 將 1 作為 a$ 的最新值進行儲存
combinelatest 現在同時擁有了 a$ 和 b$ 的值,因此它發出值 1 + 10
a$ 發出值 2
combinelatest 將 2 作為 a$ 的最新值進行儲存
combinelatest 發出值 2 + 10
發出的順序為10, 1, 2
。為了得到另外一種發出順序,可以為b$
也自定義排程器:
const a$ = rx.observable.from([1, 2], rx.scheduler.asap);
// const b$ = rx.observable.of(10);
const b$ = rx.observable.from([10], rx.scheduler.asap); // 新**
const c$ = rx.observable.combinelatest(a$, b$, (a, b) => a + b);
c$.subscribe(c =>
console.log(c));複製**
現在發出的順序為1, 10, 2
,因為執行順序如下:
c$ 被訂閱
combinelatest 的第乙個輸入流 a$ 被訂閱
combinelatest 的第二個輸入流 b$ 被訂閱
a$ 發出值 1
combinelatest 將 1 作為 a$ 的最新值進行儲存
b$ 發出值 10
combinelatest 將 10 作為 b$ 的最新值進行儲存
combinelatest 現在同時擁有了 a$ 和 b$ 的值,因此它發出值 1 + 10
a$ 發出值 2
combinelatest 將 2 作為 a$ 的最新值進行儲存
combinelatest 發出值 2 + 10
排程器還可以讓事件的發出變得更快,同時保持發出的順序不變。例如,rxjs 的testscheduler
可以使observable.interval(1000).take(10)
被訂閱時進行同步執行,而不需要花費10秒鐘來完成:
rx.observable.interval(1000, new rx.testscheduler()).take(10)複製**
testscheduler
是在 rxjs 內部使用的 (參見 filter 的測試用例),它使得成百上千個時間相關的測試**飛快地執行,但有一些像 rx sandbox 這樣的工具和積極的討論來豐富此排程器的使用場景,使得在 rxjs 內部之外的地方也可以使用。 Rxjs 學習入門
定義觀察者 var observer error function error complete function 同時與訂閱者進行繫結 observable.subscribe observer 1.1 生產基本資料 var observable rx.observable create func...
rxjs入門4之rxjs模式設計
觀察者模式其實在日常編碼中經常遇到,比如dom的事件監聽,如下 function clickhandler event document.body.addeventlistener click clickhandler 簡而言之 觀察者模式就如同上 有乙個事件源 dom 的click事件 也就是ab...
RxJS入門(4) 深入Sequence
接著 3 這一張分成兩部分來翻譯。html 上面 1 5標籤的說明如下 這是使用leaflet來渲染地圖的div佔位符。這是我們用來載入jsonp內容的工具函式。我們在合理的縮放級別上設定洛杉磯 這裡 比較多 的中心座標來初始化leaflet地圖。我們告訴leaflet為我們的地圖設定預設的主題集。...