訊息佇列亦稱報文佇列,也叫做信箱。意思是說,這種通訊機制傳遞的資料具有某種結構,而不是簡單的位元組流。訊息佇列的工作機制如下所示:
為在程序與核心之間傳遞訊息,無論傳送程序還是接收程序,都需要在程序空間中用訊息緩衝區來暫存訊息。該訊息緩衝區的結構定義如下:
struct msgbuf ;
從這個緩衝區的定義可以看到,訊息通訊的最大特點在於,傳送程序可以在域mtype中定義訊息的型別,這樣就為接收程序提供了乙個方便,即接收程序可以根據mtype來判斷佇列中的乙個訊息是否為它所等待的訊息,從而使接收程序可以有選擇地進行接收。域mtext為存放訊息正文的陣列,傳送程序可以根據訊息的大小定義該陣列的長度。
為便於核心對訊息的維護和管理,以及要將大型訊息分頁存放,所以記憶體中用於訊息通訊的資料結構要比程序訊息緩衝區的結構稍微複雜一些。核心空間訊息結構分為首頁結構和一般頁結構。其首頁結構msg_msg的定義如下:
struct msg_msg ;
當程序傳送訊息時,負責傳送訊息的系統呼叫會把程序空間的msgbuf中的訊息正文複製到核心空間訊息首頁結構msg_msg的後面。一般頁結構msg_msgseg的定義如下:
struct msg_msgseg ;
乙個大型訊息的結構如下所示:
每個訊息佇列都有乙個msg_queue結構型別的佇列頭。msg_queue結構的定義如下:
struct msg_queue ;
訊息佇列的結構圖如下所示:
結構msg_queue中還分別記錄了接收和傳送程序的等待佇列。每個正在等待接收程序的描述結構如下:
struct msg_receiver ;
每個正在等待傳送程序的描述結構如下:
struct msg_sender
;
與共享記憶體的管理方式一樣,linux把所有訊息佇列都組織在乙個陣列中。在檔案linux/ipc/util.h如下:
struct ipc_id_ary
;
同時,在檔案linux/ipc/msg.c中也定義了乙個全域性變數msg_ids來管理訊息佇列:
static struct ipc_ids msg_ids;
這個變數的資料型別為ipc_ids結構。在檔案linux/ipc/util.h中宣告的ipc_ids結構**如下:
struct ipc_ids ;
訊息佇列的整體結構如下所示:
標頭檔案:
#include #include #include
訊息佇列的核心持續性要求每個訊息佇列都在系統範圍內對應唯一的鍵值,所以,要獲得乙個訊息佇列的描述符,只需提供該訊息佇列的鍵值即可。
訊息佇列描述字是由在系統範圍內唯一的鍵值生成的,而鍵值可以看作對應系統內的一條路徑。
程序可以通過呼叫函式msgget()來建立乙個訊息佇列。msgget()對應的系統呼叫如下:
int msgget(key_t key, int msg***);
asmlinkage long sys_msgget(key_t key, int msg***);
其中,引數key是使用者給定的鍵值。如果該值為0,系統會為程序建立乙個程序自用的訊息佇列,即供其自發自收;否則建立或開啟乙個訊息佇列。引數msg***是該函式的功能標誌。
訊息讀寫操作非常簡單,對於開發人員來說,每個訊息都類似於如下的資料結構,即需要使用者宣告乙個資料結構:
struct msgbuf ;
其中,mtype成員代表訊息型別,從訊息佇列中讀取訊息的乙個重要依據就是訊息的型別;mtext是訊息內容,當然長度不一定為1。
對於傳送訊息來說,首先預置乙個msgbuf緩衝區並寫入訊息型別和內容,呼叫相應的傳送函式即可;對於讀取訊息來說,首先分配這樣乙個msgbuf緩衝區,然後把訊息讀入該緩衝區即可。
int msgsnd(int msqid, struct msgbuf * msgp, int msgsz, int msg***);
向訊息佇列傳送一條訊息。其中,msqid為已開啟的訊息佇列id;msgp為存放訊息的結構;msgsz為訊息資料的長度;msg***為傳送標誌。有意義的msg***標誌為ipc_nowait,指明在訊息佇列沒有足夠空間容納要傳送的訊息時,msgsnd是否等待。
int msgrcv(int msqid, struct msgbuf * msgp, int msgsz, long msgtyp, int msg***);
從msqid代表的訊息佇列中讀取乙個訊息,並把訊息存放在msgp指向的msgbuf結構中。在成功地讀取了一條訊息後,佇列中的這條訊息將被刪除。
操作成功時返回0,失敗返回-1。
例子:父程序向訊息佇列傳送訊息,子程序從訊息佇列接收訊息。
#include #include #include #include #include #include int main()
send_buf, receive_buf;
if ((msqid = msgget(ipc_privare, 0700)) < 0) else
printf("msgget建立訊息佇列成功");
if ((pid = fork()) < 0) else if (pid > 0) else
printf("訊息傳送成功");
sleep(2);
exit(0);
}else else
printf("訊息讀取成功");
if ((msgctl(msqid, ipc_rwid, null)) < 0)
else
printf("訊息刪除成功");
Linux Linux程序間通訊之訊息佇列
1 訊息佇列概念引入 訊息佇列提供了乙個從乙個程序向另外乙個程序傳送一塊資料的方法每個資料塊都被認為是有乙個型別,接收者程序接收的資料塊可以有不同的型別值訊息佇列也有管道一樣的不足,就是每個訊息的最大長度是有上限的 msgmax 每個訊息佇列的總的位元組數是有上限的 msgmnb 系統上訊息佇列的總...
訊息佇列 訊息佇列
輪詢排程 一次性分發所有訊息,保證訊息平均分配,不管消費者是否能正常消費 訊息應答 保證消費端能確實消費,不丟失 公平 乙個乙個分發所有訊息,在保證分發到的執行緒確認回覆後,才分發下個訊息給下個空閒的消費者,訊息持久化 保證佇列中的訊息不丟失,包括3要素 交換器 訊息佇列 訊息都必須宣告持久化 發布...
訊息佇列 訊息佇列 kafka
kafka是乙個分布式的基於發布 訂閱模式的訊息佇列,主要用於大資料實時處理領域。要理解kafka首先要有分布式的概念,要有訊息佇列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,a系統生成了乙個訊息,b系統非同步獲取,那麼就需要乙個存放訊息的訊息佇列 mq 相比較傳統的訊息佇列,訊息被消費...