但在用c#實現的過程中,連線已經正常了,qos也設定為2了,就是收不到離線的訊息。後來發現,問題不是出現在協議上,而是在資料的接收上。在接收資料時,用socket.receive來接收,在連線的時候,會返回資料,而接收的buff開的較大,則會在接收到服務端對connect回應的資料報外,也會把離線的資料也一起收起來。這時就要按協議來作解包處理了。如果沒有作解包處理,則對收到的connect的conack命令處理,那後面的離線資料就會丟失了。
namespace mqttsdk
/// /// 標識。佔第乙個位元組的0-3bit
/// 包型別為publish時,對於mqtt 3.1.1,bit 0:retain3,bit 1:qos2,bit 2:qos2,bit 3:dup1。
/// 包型別為pubrel、subscribe、unsubscribe時,bit 1:1,bit 0、bit 2、bit 3均為0。
/// 其他包型別,bit 0、bit 1、bit 2、bit 3均為0。
///
public int flags
/// /// 剩餘長度。可變頭部和載荷的位元組數之和。
///
public int remaininglength
public byte firstbyte
/// /// 剩餘長度對應的位元組
///
public listremaininglengthbytes
#endregion
#region 可變頭和載荷的資料
/// /// 可變頭和載荷的資料
///
public listbytesofvariableheaderandpayload
#endregion
#endregion
#region read
/// /// 從socket讀取資料
///
///
public imqttpacket read(socket socket)
var byteone = new byte[1];
var len = socket.receive(byteone);
if (len <= 0)
bytesofvariableheaderandpayload = new list();
remaininglengthbytes = new list();
//讀取第1個位元組
firstbyte = byteone[0];
flags = firstbyte & 0b00001111;
packettype = (packettypeenum)(firstbyte >> 4);
//讀取第2個位元組
byteone[0] = 0;
len = socket.receive(byteone);
if (len <= 0)
var second_byte = byteone[0];
//===計算【剩餘長度】===
//【剩餘長度】取7bit來計算,最高位的bit7作為標識,
// bit7為1則說明下乙個位元組也是剩餘長度的一部分,
// bit7為0則讀取剩餘長度完畢。
int remaining_length = second_byte & 0b01111111;
var flag_bit = (second_byte & 0b10000000);
int multiplier = 1;
while (true)
len = socket.receive(byteone);
if (len <= 0)
var next_byte = byteone[0];
flag_bit = next_byte & 0b10000000;
remaining_length += (next_byte & 0b01111111) * multiplier;
remaininglengthbytes.add(next_byte);
}remaininglength = remaining_length;
//讀取可變頭和載荷的資料
var blocksize = 4096;
var buff = new byte[blocksize];
int readsize = blocksize;
while (remaining_length > 0)
else
len = socket.receive(buff, 0, readsize, socketflags.none);
if (len <= 0)
remaining_length -= len;
var tempbuff = new byte[len];
array.copy(buff, tempbuff, len);
bytesofvariableheaderandpayload.addrange(tempbuff);
}var packet = createpacket();
return packet;
}#endregion
#region createpacket
private imqttpacket createpacket()
case packettypeenum.suback:
case packettypeenum.puback:
case packettypeenum.pubrec:
case packettypeenum.pubrel:
case packettypeenum.pubcomp:
case packettypeenum.publish:
case packettypeenum.unsuback:
case packettypeenum.pingresp:
}return packet;
}#endregion
#region gettotalbytes
/// /// 獲取所有位元組
///
///
public byte gettotalbytes()
if (!listhelper.isempty(bytesofvariableheaderandpayload))
var buff = list.toarray();
return buff;
}#endregion}}
其中mqtt***packet是對每個命令包的實現,可以參照協議格式自行封裝。
namespace mqttsdk
}
測試:先用publish端發布訊息。
測試:再開始subcribe端接收訊息(離線訊息)
無法收到redis訂閱訊息
現網程式執行一段時間後,經常發現收不到redis訂閱訊息。輸入client list查詢redis連線資訊,輸出如下資訊 id 2375018 addr 120.15.207.135 9159 fd 663 name subarea age 3324 idle 563 flags n db 0 su...
android訊息推送 mqtt協議
對與訊息推送是什麼個概念,在此就不贅述啦。google自帶的c2md服務,可以幫助我們實現該功能,可以該伺服器在國外,所以鑑於網速等各種條件限制,我們也沒法實現。為解決該問題,在讀了大量的部落格等質料之後,終於見到啦陽光。第三個是由ibm提供的mqtt協議的實現,就相當於乙個 開啟1883埠,在se...
MQTT斷線重連訂閱無法接收
mqtt客戶端是用的 paho 採用以下配置 connopts new mqttconnectoptions connopts.setcleansession true connopts.setconnectiontimeout 10 connopts.setkeepaliveinterval 90...