如下圖所示,業務層與netty客戶端之間用remotingcommand
進行互動,即業務層呼叫netty傳送訊息時,會將訊息封裝在remotingcommand
物件裡面,而netty接收到外部訊息的時候會給業務層返回remotingcommand
的物件例項。
netty與外部世界通過位元組流進行傳輸。netty在傳送訊息的時候,對remotingcommand進行編碼(物件–>位元組流);在接收到外部訊息的時候會對位元組流進行解碼(位元組流–>物件)。
業務層與netty之間互動方式的偽**:
--
--經過業務層構建處理的訊息體header/body--
--header =..
.;body =..
.;----準備remotingcommand--
--remotingcommand =
newremotingcommand()
;remotingcommand.
setcustomerheader
(header)
;remotingcommand.
setbody
(body);--
--呼叫netty傳送remotingcommand--
--netty.
send
(remotingcommand)
;
netty會用nettyencoder
對物件進行編碼。該類的原始碼如下,它繼承了netty中的messagetobyteencoder
。覆寫了encode()
方法。它的泛型變數為remotingcommand
表示它只處理傳遞過來的remotingcommand
的物件。
它裡面的編碼邏輯,首先對訊息頭進行編碼,然後編碼訊息體。訊息體其實已經是位元組流了,直接寫入bytebuf中即可。而需要對訊息頭進行編碼。對訊息頭編碼時呼叫的是remotingcommand
的encodeheader()
方法。
public
class
nettyencoder
extends
messagetobyteencoder
}catch
(exception e)
remotingutil.
closechannel
(ctx.
channel()
);}}
}
如下是nettydecoder
的源**,它繼承了lengthfieldbasedframedecoder
,表明它解決tcp粘包和拆包的方式為長度基礎方式。
裡面的邏輯很簡單,從網路通道讀取到資料,然後呼叫remotingcommand
的靜態解碼方法decode
進行解碼,返回乙個remotingcommand
物件。
public
class
nettydecoder
extends
lengthfieldbasedframedecoder
@override
public object decode
(channelhandlercontext ctx, bytebuf in)
throws exception
bytebuffer bytebuffer = frame.
niobuffer()
;// 直接呼叫remotingcommand進行解碼
return remotingcommand.
decode
(bytebuffer);}
catch
(exception e)
finally
}return null;
}}
remotingcommand的成員屬性。其中customeheader表示訊息頭,body表示訊息體,它是不進行序列化。
public
class
remotingcommand
最終結果就是將remotingcommand物件進行序列化,而customeheader的屬性以及值會被放到remotincommand的extfields
之中,之後隨remotingcommand一起序列化成位元組碼。
首先確定訊息體的長度。
public bytebuffer encodeheader()
public bytebuffer encodeheader
(final
int bodylength)
首先將customeheader中的屬性放到remotingcommand的屬性extfields
之中,是乙個map
。
然後根據設定序列的方式採用不同的編碼方式,其實就是序列化remotingcommand物件本身。主要是rocketmq
自定義的協議,或者採用json
的方式。
private
byte
headerencode()
else
}
最終結果是就是將位元組碼還原為乙個remotingcommand
物件。並設定customeheader
和body
的值
public
static remotingcommand decode
(final
byte
array)
public
static remotingcommand decode
(final bytebuffer bytebuffer)
cmd.body = bodydata;
return cmd;
}
private
static remotingcommand headerdecode
(byte
headerdata, serializetype type)
return null;
}
在傳遞訊息的時候會建立乙個協議頭customeheader,不同場景下會使用不同的協議頭。
協議頭都實現了乙個公共的介面commandcustomheader
。它的實現者有很多。
我理解的RocketMQ 消費者負載均衡的實現
抽象類中有乙個重要的方法dorebalance 這個方法是對外使用的。即使用的方式就是建立乙個負載均衡實現類的例項,然後呼叫它的dorebalance 方法即可進行。幹的核心事情 主要流程說明 public void dorebalance final boolean isorder this tr...
兩層網路 三層網路的理解
對於搞it的同行而言,大部分人都不會直接和網路打交道,因此除非從事網路開發,否則對網路內部機制也不會太關心,但是明白網路資料是怎麼走的,這對每個it工程師應該是很重要的基礎知識。網路資料報如何在網路上遊蕩,長久以來也困擾了我很長時間,現在把這部分內容總結分享一下。說起網路,大家不約而同會想起大學課本...
兩層網路,三層網路的理解
對於搞it的同行而言,大部分人都不會直接和網路打交道,因此除非從事網路開發,否則對網路內部機制也不會太關心,但是明白網路資料是怎麼走的,這對每個it工程師應該是很重要的基礎知識。網路資料報如何在網路上遊蕩,長久以來也困擾了我很長時間,現在把這部分內容總結分享一下。說起網路,大家不約而同會想起大學課本...