之前的部落格寫過通過inotify 加檔案的形式來實現多程序佇列的文章。這種方式在通常情況下表現不錯,但是這裡存在乙個問題就是當消費者過慢,會產生大量的擊穿核心高速緩衝區io,導致消費者卡在讀取資料的瓶頸上,無法使用負載均衡等手段來提高處理能力。
為了解決上述問題,引入了共享記憶體,眾所周知,這是所有ipc中最快的通訊方式,從根本上解決這個問題。下面通過實現乙個producer 和 consumer 程式,來展示我的設計思路。
由於物理記憶體有限,生產者會使用乙個環形緩衝區來保證熱點資料始終在記憶體中。同時為了保證消費者的接入配置最小化,生產者將配置通過乙個固定大小的結構體對映到記憶體中,消費者首先對映結構體讀取配置資訊,從結構體中的得知緩衝區大小後執行mremap進行重新調整大小,這樣消費者只需要知道共享記憶體的位址(乙個檔名),就可以實現消費。同時採用了訊息計數,來標識消費者是否已經處理所有訊息,觸發等待。當消費者在等待新資料時,喚醒消費者我們選擇了通過向指定檔案寫入乙個位元組的內容觸發inotify,雖然通過訊號量也可以實現,但是使用訊號量會導致生產者要多開乙個執行緒實現管理,引入額外的複雜度。
#include #include #include #include #include #include #include #include #include #include #include #include define_int64(shm_size, 6, "shm_size m");
define_string(inotify_file, "/tmp/writer.txt", "inotify file path");
define_string(shm_file, "/test", "shm file path");
define_string(shm_key, "", "shm key");
class producer
bool init(const std::string &key)
// 開啟共享記憶體
shm_fd_ = shm_open(shm_path_.c_str(), o_creat | o_rdwr | o_trunc, 0777);
if (shm_fd_ < 0)
uint64_t size = shm_size_ + sizeof(shm_data);
if (ftruncate(shm_fd_, size) == -1)
shm_data_ = (shm_data *)mmap(null, size, prot_read | prot_write, map_shared, shm_fd_, 0);
if (shm_data_ == map_failed)
shm_data_->total = 0;
shm_data_->size = shm_size_;
memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
memcpy(shm_data_->key, key.c_str(), key.size());
return true;
}void write(const char *line)
shm_data_->buffer[current_offset_++] = line[i];
}if (current_offset_ >= shm_size_)
shm_data_->buffer[current_offset_++] = '\0';
shm_data_->total++;
write(fd_, "8", 1);
private:
struct shm_data
;shm_data *shm_data_ = nullptr; // 共享記憶體
int fd_;
int shm_fd_;
uint64_t shm_size_ = 0;
uint64_t buffer_size_ = 0;
uint64_t total_read = 0;
uint64_t current_offset_ = 0;
std::string inotify_path_;
std::string shm_path_;
};int main(int argc, char *ar**)
消費者實現就相對簡單一些,讀取配置結構體,執行mremap調整大小, 如果機器效能足夠,可以選擇不等待inotify,類似自旋鎖的方式。這種方式測試發現新訊息能在10us左右被消費者感知,使用inoitfy新訊息感知需要40us左右。
#include #include #include #include #include #include #include #include #include #include #include define_string(shm_file, "/test", "shm file path");
define_bool(shm_nowait, false, "shm no wait mode");
#define event_size (sizeof(struct inotify_event))
#define buf_len (10 * (event_size + filename_max + 1))
class consumer
~tail()
bool init(const std::string& key)
shm_data *shm_info_ = (shm_data *)mmap(null, sizeof(shm_data), prot_read | prot_write, map_shared, shm_fd_, 0);
if (shm_info_ == map_failed)
printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
if (strcasecmp(shm_info_->key, key.c_str()) != 0)
// 開始監聽檔案變化
inotify_fd_ = inotify_init();
if (inotify_fd_ < 0)
shm_size_ = shm_info_->size;
uint64_t real_size = shm_info_->size + sizeof(shm_data);
inotify_add_watch(inotify_fd_, shm_info_->inotify_name, in_modify | in_create | in_delete);
shm_data_ = (shm_data *)mremap(shm_info_, sizeof(shm_data), real_size, mremap_maymove);
if (shm_data_ == map_failed)
return true;
}void loop()
line_[i] = shm_data_->buffer[current_offset_++];
if (line_[i] == '\0')
}total_read++;
printf("current_offset=%d, total=%d, read=%d, %s", current_offset_, shm_data_->total, total_read, line_);
}if (!flags_shm_nowait)}}
private:
struct shm_data
;shm_data *shm_data_ = nullptr; // 共享物件指標
uint64_t shm_size_ = 0; // 共享記憶體大小
uint64_t line_size_ = 0; // 每條資料最大值
uint64_t total_read = 0; // 當前讀取總記錄數
uint64_t current_offset_ = 0; // 當前讀取的偏移量
std::string shm_path_;
int inotify_fd_;
int shm_fd_;
int tag_;
char *line_;
char *inotify_buffer_;
};define_string(shm_key, "", "shm key");
int main(int argc, char *ar**)
consumer.loop();
}
多程序通訊(IPC) 共享記憶體
1 共享記憶體介紹 共享記憶體可以說是最有用的程序間通訊方式,也是最快的ipc形式。兩個不同程序a b共享記憶體的意思是,同一塊物理記憶體被對映到程序a b各自的程序位址空間。程序a可以即時看到程序b對共享記憶體中資料的更新,反之亦然。由於多個程序共享同一塊記憶體區域,必然需要某種同步機制,互斥鎖和...
多程序鎖和共享記憶體
當我們用多程序來讀寫檔案的時候,如果乙個程序是寫檔案,乙個程序是讀檔案,如果兩個檔案同時進行,肯定是不行的,必須是檔案寫結束以後,才可以進行讀操作。或者是多個程序在共享一些資源的時候,同時只能有乙個程序進行訪問,那就要有乙個鎖機制進行控制。需求 乙個程序寫入乙個檔案,乙個程序追加檔案,乙個程序讀檔案...
linuxC多程序通訊 POSIX共享記憶體
簡單的共享記憶體讀寫示例 訊號量和共享記憶體 使用示例 函式功能 改變檔案大小 相關函式 open truncate 表頭檔案 include 函式原型 int ftruncate int fd,off t length 函式說明 ftruncate 會將引數fd指定的檔案大小改為引數length指...