Zookeeper場景實踐 (8) 分布式佇列

2021-06-26 20:13:56 字數 4320 閱讀 8687

按照zookeeper典型應用場景一覽

裡的說法,分布式佇列有兩種,一種是常規的先進先出佇列,另一種是要等到佇列成員聚齊之後的才統一按序執行。

第二種佇列可以先建立乙個/queue,賦值為n,表達佇列的大小。然後每個佇列成員加入時,就判斷是否達到佇列要求的大小,如果是可以進行下一步動作,否則繼續等待佇列成員的加入。比較典型的情況是,當乙個大的任務可能需要很多的子任務完成才能開始進行。

比如彙總賬單的時候,就必須先將使用者的消費資料,積分資料等都統計完成後才能開始。彙總賬單的程式建立乙個佇列/queue,賦值為2,然後分別統計消費資料和積分資料的程式當完成任務時就往/queue下建立乙個臨時節點。而彙總賬單程式監測到/queue的子節點個數為2時,就可以開始執行任務了。

實際上,我們也可以先建立乙個數目為2的子節點。當乙個子任務完成的時候,就刪除乙個子節點,當所有子節點都被刪除的時候,主任務就可以開始執行了。這個過程可以形象的理解為拆除屏障。因此這種佇列還有乙個專門的詞語描述,叫做屏障(barrier)。

講了那麼多的關於屏障的認識,但是並不打算就去實現它,並且zookeeper的官方文件也有相關的知識。這次的主要目標是常規的fifo佇列。我將實現佇列的兩個主要操作:push和pop。

1).int push(zhandle_t *zkhandle,const char *path,char *element)

2).int pop(zhandle_t *zkhandle,const char *path,char *element_buffer,int *buffer_len)

簡單來說,假設佇列的路徑為/queue,push就是就是建立乙個臨時有序的/queue/queue-節點。pop就是取出/queue/下序列號最小的節點。

我們知道在c++中stl裡有乙個queue的類,實現了push,pop等操作,然而它是非執行緒安全的,即多個執行緒同時push/pop的時候可能會出現錯誤。而由於zookeeper保證了建立節點和刪除節點的一致性,因此可以說利用zookeeper實現的佇列是程序安全的。

來看push和pop的具體實現。push的實現很簡單,就是在下建立乙個有序的/queue-子節點.

int push(zhandle_t *zkhandle,const char *path,char *element)

; char path_buffer[512] = ;

int bufferlen = sizeof(path_buffer);

sprintf(child_path,"%s/queue-",path);

int ret = zoo_create(zkhandle,child_path,element,strlen(element),

&zoo_open_acl_unsafe,zoo_sequence,

path_buffer,bufferlen);

if(ret != zok)else

return ret;

}

pop的功能則是取出下序號最小的子節點,如果沒有子節點,則返回-1.

int pop(zhandle_t *zkhandle,const char *path,char *element,int *len)

else if (children.count == 0)else

}if(min != null);

sprintf(child_path,"%s/%s",path,min);

ret = zoo_get(zkhandle,child_path,0,element,len,null);

if(ret != zok)else}}

}for(i = 0; i < children.count; ++i)

return ret;

}

最後,再來看看模擬佇列操作的程式。和其他程式類似,它的選項有

如:向佇列/queue中壓人乙個元素,元素的值為

"hello":

>myqueue -s 172.17.0.36:2181 -p /queue -m push -v hello

將佇列/queue彈出乙個元素

>myqueue -s 172.17.0.36:2181 -p /queue -m pop

最後附上完整的源**:

#include#include#include#include"zookeeper.h"  

#include"zookeeper_log.h"

char g_host[512]= "172.17.0.36:2181";

char g_path[512]= "/queue";

char g_value[512]="msg";

enum mode g_mode;

void print_usage();

void get_option(int argc,const char* argv);

/**********unitl*********************/

void print_usage()

void get_option(int argc,const char* argv)

else

break;

case 's':

strncpy(g_host,optarg,sizeof(g_host));

break;

case 'p':

strncpy(g_path,optarg,sizeof(g_path));

break;

case 'v':

strncpy(g_value,optarg,sizeof(g_value));

break;

default:

break;}}

} int push(zhandle_t *zkhandle,const char *path,char *element)

; char path_buffer[512] = ;

int bufferlen = sizeof(path_buffer);

sprintf(child_path,"%s/queue-",path);

int ret = zoo_create(zkhandle,child_path,element,strlen(element),

&zoo_open_acl_unsafe,zoo_sequence,

path_buffer,bufferlen);

if(ret != zok)else

return ret;

}int pop(zhandle_t *zkhandle,const char *path,char *element,int *len)

else if (children.count == 0)else

}if(min != null);

sprintf(child_path,"%s/%s",path,min);

ret = zoo_get(zkhandle,child_path,0,element,len,null);

if(ret != zok)else}}

}for(i = 0; i < children.count; ++i)

return ret;

}int front(zhandle_t *zkhandle,char *path,char *element,int *len)

else if(children.count == 0)else

}if(min != null);

sprintf(child_path,"%s/%s",path,min);

ret = zoo_get(zkhandle,child_path,0,element,len,null);

if(ret != zok)}}

for(i = 0; i < children.count; ++i)

return ret;

}int main(int argc, const char *argv)

int ret = zoo_exists(zkhandle,g_path,0,null);

if(ret != zok)else

}if(g_mode == push_mode)elseelse if( ret == -1)

}zookeeper_close(zkhandle);

return 0;

}

Zookeeper應用場景

zookeeper是乙個高可用的分布式資料管理與系統協調框架。基於對paxos演算法的實現,使該框架保證了分布式環境中資料的強一致性,也正是基於這樣的特性,使得zookeeper能夠應用於很多場景。網上對zk的使用場景也有不少介紹,本文將結合作者身邊的專案例子,系統的對zk的使用場景進行歸類介紹。值...

Zookeeper應用場景

分布式佇列 fifo 先進先出 barrier 同步佇列 共享鎖集群管理 leader選舉 命名服務 分布式應用配置項的管理等 fifo設計思路 1.在 queue fifo的目錄下建立 sequential 型別的子目錄 x i 這樣就能保證所有成員加入佇列時都是有編號的。2.出佇列時通過 get...

zookeeper 使用場景

比如像dubbo就是用的zookeeper的命名服務,利用的是樹形的目錄結構可以統一對配置進行動態的調整,利用的是節點變更的監聽可以在多個機器中選出leader,利用的是臨時順序編號目錄節點 ephemeral sequential 在集群重需要某個操作保持一致性和時序性的時候,利用建立節點來做獨佔...