RxJava實現事件匯流排 RxBus 及詳解

2021-07-24 03:13:06 字數 2673 閱讀 1648

先把大家都認同的實現**貼出來

public

class

rxbus

private

static

class

bussingleholder

//單例模式

public

static rxbus getinstance()

/*** 傳送訊息 即呼叫所有觀察者的onnext()方法

*@param obj

*/public

void

send(object obj)

public

observablegetobservable(classeventtype)

}

使用:

//傳送方

rxbus.getinstance().send("發了條訊息");

//接收方

subscription subscription = rxbus.getinstance().getobservable(string.class)

.subscribe(new subscriber()

@override

public

void

onerror(throwable e)

@override

public

void

onnext(string s)

});//解綁

subscription.unsubscribe();

可以看到,**非常少,乍一瀏覽完,你可能會有些疑惑,比如subject 這個類,比如詳細的傳送和接收流程……先不急著看這個,一步一步來,我們去掉一些枝葉,把主幹提取出來。

subject subject = publishsubject.create();

subject.subscribe(new action1()

});subject.onnext("send something");

看到了吧,實際上最主幹的乙個東西就是subject 的乙個例項,把它設定成全域性變數過後,可發可收。看看這個類;

public

abstract

class

subject

extends

observable

implements

observer

它繼承了observable也實現了observer介面,所以它既是發布者,也是訂閱者並且是乙個抽象類,這裡用它的子類publishsubject(為啥用它先暫時不管,下面再說),再來看它的傳送的方法:

// final subjectsubscriptionmanagerstate = new subjectsubscriptionmanager();

//state 是乙個訂閱者的管理類

@override

public

void

onnext(t v)

}

所以,傳送的方法其實就是呼叫所有觀察者的onnext()方法開始執行。這點知道觀察者模式的同學應該都了解。那麼,怎麼新增訂閱者呢,subject.subscribe()方法嘛!它自己同時充當兩種角色,既傳送,又接收嘛。所以這時候來按正確的順序捋一捋這個流程:

1.建立全域性的可傳送可接收訊息的subject例項

2.通過subject.subscribe()給自己新增訂閱者(subscriber/observaber)

3.通過subject.onnext()呼叫所有訂閱者的執行方法,即所謂的」傳送訊息」。

至此,主幹流程梳理完畢。是否清晰一些了?

我們再回到原來的**中,傳送的方法不用再解釋了。這裡還有乙個getobservable方法,呼叫subject.oftype()方法,原始碼如下:

public

final

observableoftype(final classklass)

oftype()其實就是根據傳入物件的型別進行過濾和轉換操作之後返回只發射指定型別observable的方法。簡單說,獲取發射指定型別資料的發布者。獲取了發布者就可以新增指定資料型別的訂閱者。這樣方便在傳送資料的時候根據資料型別來區分哪些訂閱者來接收訊息,分類傳送,避免混亂。

然後還有乙個publishsubject類,只會把在訂閱發生的時間點之後來自原始observable的資料發射給觀察者。如果在訂閱之前就傳送,沒有接收者接收,沒有意義。在事件匯流排的處理上,它算是比較適合的乙個subject子類吧。

再有,關於serializedsubject,subject是執行緒非安全的,要避免這個問題,所以給轉換為serializedsubject。

到這裡,主幹和枝葉都講完了。最後一起再來梳理一把:

1.建立乙個執行緒安全的全域性靜態的publishsubject,既充當發布者、又充當訂閱者。

2.通過oftype()方法獲取指定了資料型別的發布者角色(observable),通過它的subscribe()方法新增訂閱者(同樣指定了資料型別)。

3.通過onnext()方法,呼叫所有所有符合資料型別的訂閱者的onnext()方法。完成」資料的傳送」;

所有的流程大致就是這樣了。有沒有更清楚一些?

歡迎拍磚,歡迎指正哦!謝謝!

手寫rxjava事件變換

首先還是看怎麼使用 observable.just map new function map new function subscribeon schedulers.io observeon androidschedulers.mainthread subscribe new consumer 看原...

如何使用RabbitMQ實現事件匯流排

首先,事件源與事件處理的對映字典。private static dictionary eventhandlers new dictionary 然後,初始化rabbitmq,建立到伺服器的連線,建立乙個通道等 public rabbitmqeventbus iconnectionfactory co...

ListenerBus 事件匯流排

listenerbus可以接收事件並將事件送到對應的事件 listenerbus原始碼中第一行,建立了乙個執行緒安全的arraylist copyonwritearraylist,之後新增 者刪除事件等操作都在這個執行緒安全的arraylist中執行 private spark val listen...