SendMessageProcessor原始碼分析

2021-10-04 22:20:19 字數 3798 閱讀 4149

同步處理請求和非同步處理請求。

同步就是呼叫非同步方法返回future.get

@override

public remotingcommand processrequest

(channelhandlercontext ctx,

remotingcommand request)

throws remotingcommandexception

catch

(interruptedexception

| executionexception e)

return response;

}@override

public

void

asyncprocessrequest

(channelhandlercontext ctx, remotingcommand request, remotingresponsecallback responsecallback)

throws exception

asyncprocessrequest處理consumer_send_msg_back和其他訊息

我們重點看下普通資訊的處理

public completablefuture

asyncprocessrequest

(channelhandlercontext ctx,

remotingcommand request)

throws remotingcommandexception

mqtracecontext =

buildmsgcontext

(ctx, requestheader)

;this

.executesendmessagehookbefore

(ctx, request, mqtracecontext)

;// 批量訊息

if(requestheader.

isbatch()

)else

}}

asyncsendmessage校驗broker的狀態,校驗訊息的合法性

如果沒有topic配置符合建立條件則建立 並向nameserver上報負責的topic資訊

remotingcommand轉換成了messageextbrokerinner

呼叫messagestore介面非同步新增訊息。 defaultmessagestore原始碼分析

private completablefuture

asyncsendmessage

(channelhandlercontext ctx, remotingcommand request,

sendmessagecontext mqtracecontext,

sendmessagerequestheader requestheader)

final

byte

body = request.

getbody()

;int queueidint = requestheader.

getqueueid()

; topicconfig topicconfig =

this

.brokercontroller.

gettopicconfigmanager()

.selecttopicconfig

(requestheader.

gettopic()

);if(queueidint <0)

messageextbrokerinner msginner =

newmessageextbrokerinner()

; msginner.

settopic

(requestheader.

gettopic()

);msginner.

setqueueid

(queueidint);if

(!handleretryanddlq

(requestheader, response, request, msginner, topicconfig)

) msginner.

setbody

(body)

; msginner.

setflag

(requestheader.

getflag()

);messageaccessor.

setproperties

(msginner, messagedecoder.

string2messageproperties

(requestheader.

getproperties()

)); msginner.

setpropertiesstring

(requestheader.

getproperties()

);msginner.

setborntimestamp

(requestheader.

getborntimestamp()

);msginner.

setbornhost

(ctx.

channel()

.remoteaddress()

);msginner.

setstorehost

(this

.getstorehost()

);msginner.

setreconsumetimes

(requestheader.

getreconsumetimes()

== null ?

0: requestheader.

getreconsumetimes()

);completablefuture

putmessageresult = null;

maporigprops = messagedecoder.

string2messageproperties

(requestheader.

getproperties()

);string transflag = origprops.

get(messageconst.property_transaction_prepared);if

(transflag != null && boolean.

parseboolean

(transflag)

) putmessageresult =

this

.brokercontroller.

gettransactionalmessageservice()

.asyncpreparemessage

(msginner);}

else

return

handleputmessageresultfuture

(putmessageresult, response, request, msginner, responseheader, mqtracecontext, ctx, queueidint)

;}

Cartographer原始碼篇 原始碼分析 1

在安裝編譯cartographer 1.0.0的時候,我們可以看到 主要包括cartorgarpher ros cartographer ceres sover三個部分。其中,ceres solver用於非線性優化,求解最小二乘問題 cartographer ros為ros平台的封裝,獲取感測器資料...

AbstractListView原始碼分析3

normal list that does not indicate choices public static final int choice mode none 0 the list allows up to one choice public static final int choice ...

Android AsyncTask原始碼分析

android中只能在主線程中進行ui操作,如果是其它子執行緒,需要借助非同步訊息處理機制handler。除此之外,還有個非常方便的asynctask類,這個類內部封裝了handler和執行緒池。本文先簡要介紹asynctask的用法,然後分析具體實現。asynctask是乙個抽象類,我們需要建立子...