RxJava 原始碼解讀分析 observeOn

2021-07-31 12:58:25 字數 1946 閱讀 2080

observable.observeon()方法有點繞,我們一點一點看。

從上圖中,我們可以看出,observeon主要作的工作是:

1,通過指定的scheduler來切換執行緒,用來emit資料,這個資料就是onnext(data)方法的引數。

2,emit出來的資料,先非同步的快取到乙個buffer,實際上是快取到了乙個queue中,注意,每呼叫一下observeon()方法,就會new乙個queue,以此保證事件序列特性。

3,通過指定的scheduler來非同步的消費queue中的資料

再來看看operatorobserveon這個類,對原始的subscriber進行了一下非同步類包裝,如下圖:

從上圖中,可以看出,這個被包裝過的onnext方法,在當前執行緒中做了一件事重的事,將onnext(data)的引數資料,直接放到queue的尾部,最後呼叫了schedule()方法如下圖。

從圖中可以看出,emit出的資料被同步放入到queue中,緊接者,非同步的從queue中poll()出佇列頭資料,再執行使用者定義的onnext(data)方法。

再回過頭來看,lift()方法,如下圖

再來看看onsubscribelift類中有什麼,如下圖。

從上圖中,可以看出,call()方法中呼叫了operatorobserveon.call(),而operatorobserveon.call()是用來產生乙個**的subscriber(指定scheduler來實現非同步操作)。

ok,我們再回頭來,理一下,整個observeon()的呼叫流程。

subscriber.onnext(data)//這個類已經被包裝了,onnext()只是在當前執行緒中,將data pull 到queue佇列尾

scheduler()

recursivescheduler.schedule(observeonsubscriber);

observeonsubscriber.call()//這裡通過指定的執行緒,非同步的從queue中獲取資料

localchild.onnext(data);//localchild就是最原始的使用者定義的subscriber

結合以上說明,來看一下類圖,主要操作集中在operatorsubscribeon和observeonsubscriber中。

RxJava 原始碼解讀分析 from

我們繼續,看看observable.from 是怎麼回事,from原始碼如下圖。再來看看onsubscribefromarray這個類,如下圖 我們發現,onsubscribefromarray就是乙個onsubscribe,要實現了call 介面,call 方法中,只是設定了乙個新的fromarr...

vuex 原始碼分析 vuex原始碼解讀 簡易實現

原始碼解讀 開始之前,先貼個大綱 首先,我們從使用方法入手,一步步來看 store.js import vue from vue import vuex from vuex vue.use vuex export default new vuex.store data 2000 main.js im...

RxJava2原始碼解析

原始碼總結 observabel 通過create方法。將observableonsubscribe物件傳遞給自己。通過subscribe方法。建立 observableemitter發射器物件。發射器裡又封裝了observer。發射器又作為引數傳遞 給observableonsubscribe物件...