同步處理請求和非同步處理請求。
同步就是呼叫非同步方法返回future.get
@override
public remotingcommand processrequest
(channelhandlercontext ctx,
remotingcommand request)
throws remotingcommandexception
catch
(interruptedexception
| executionexception e)
return response;
}@override
public
void
asyncprocessrequest
(channelhandlercontext ctx, remotingcommand request, remotingresponsecallback responsecallback)
throws exception
asyncprocessrequest處理consumer_send_msg_back和其他訊息
我們重點看下普通資訊的處理
public completablefuture
asyncprocessrequest
(channelhandlercontext ctx,
remotingcommand request)
throws remotingcommandexception
mqtracecontext =
buildmsgcontext
(ctx, requestheader)
;this
.executesendmessagehookbefore
(ctx, request, mqtracecontext)
;// 批量訊息
if(requestheader.
isbatch()
)else
}}
asyncsendmessage校驗broker的狀態,校驗訊息的合法性
如果沒有topic配置符合建立條件則建立 並向nameserver上報負責的topic資訊
remotingcommand轉換成了messageextbrokerinner
呼叫messagestore介面非同步新增訊息。 defaultmessagestore原始碼分析
private completablefuture
asyncsendmessage
(channelhandlercontext ctx, remotingcommand request,
sendmessagecontext mqtracecontext,
sendmessagerequestheader requestheader)
final
byte
body = request.
getbody()
;int queueidint = requestheader.
getqueueid()
; topicconfig topicconfig =
this
.brokercontroller.
gettopicconfigmanager()
.selecttopicconfig
(requestheader.
gettopic()
);if(queueidint <0)
messageextbrokerinner msginner =
newmessageextbrokerinner()
; msginner.
settopic
(requestheader.
gettopic()
);msginner.
setqueueid
(queueidint);if
(!handleretryanddlq
(requestheader, response, request, msginner, topicconfig)
) msginner.
setbody
(body)
; msginner.
setflag
(requestheader.
getflag()
);messageaccessor.
setproperties
(msginner, messagedecoder.
string2messageproperties
(requestheader.
getproperties()
)); msginner.
setpropertiesstring
(requestheader.
getproperties()
);msginner.
setborntimestamp
(requestheader.
getborntimestamp()
);msginner.
setbornhost
(ctx.
channel()
.remoteaddress()
);msginner.
setstorehost
(this
.getstorehost()
);msginner.
setreconsumetimes
(requestheader.
getreconsumetimes()
== null ?
0: requestheader.
getreconsumetimes()
);completablefuture
putmessageresult = null;
maporigprops = messagedecoder.
string2messageproperties
(requestheader.
getproperties()
);string transflag = origprops.
get(messageconst.property_transaction_prepared);if
(transflag != null && boolean.
parseboolean
(transflag)
) putmessageresult =
this
.brokercontroller.
gettransactionalmessageservice()
.asyncpreparemessage
(msginner);}
else
return
handleputmessageresultfuture
(putmessageresult, response, request, msginner, responseheader, mqtracecontext, ctx, queueidint)
;}
Cartographer原始碼篇 原始碼分析 1
在安裝編譯cartographer 1.0.0的時候,我們可以看到 主要包括cartorgarpher ros cartographer ceres sover三個部分。其中,ceres solver用於非線性優化,求解最小二乘問題 cartographer ros為ros平台的封裝,獲取感測器資料...
AbstractListView原始碼分析3
normal list that does not indicate choices public static final int choice mode none 0 the list allows up to one choice public static final int choice ...
Android AsyncTask原始碼分析
android中只能在主線程中進行ui操作,如果是其它子執行緒,需要借助非同步訊息處理機制handler。除此之外,還有個非常方便的asynctask類,這個類內部封裝了handler和執行緒池。本文先簡要介紹asynctask的用法,然後分析具體實現。asynctask是乙個抽象類,我們需要建立子...