在消費者組模式下,當乙個訊息被消費者取出,為了解決組內訊息讀取但處理期間消費者崩潰帶來的訊息丟失問題,stream 設計了 pending 列表,用於記錄讀(xreadgroup)取但並未處理完畢(未ack)的訊息。
下面的討論基於幾點:
面向的場景為多個無差別消費者(每個消費者名子相同,功能相同)在同一group下消費任務。
我們要保證的是,每個任務至多只做一次。
**實現是在使用redis stream實現佇列服務一文的封裝基礎上實現的。
如果你的處理邏輯是:
gettask()
deltask()
yourprocessfuc();
即不太關注任務的丟失,此時無需做什麼特別處理。但一定記得deltask(),不然pending佇列會越積越多,占用大量儲存空間。
/*
* 將pending佇列中超時的資料重新放回佇列
* * $idletime: 超時時間, 毫秒
* $perpage:每次從pending佇列中取的任務數, 之所以分頁是為防止佇列太長,一下取出記憶體不夠
** 注意:只能有乙個程序執行pendingrestore
** 優點: consumer不需要做任何改動
* 缺點:
* 先del再add, 成本上不划算,
* 如果del和add中間斷掉任務就丟了
* 無法保留任務被重複投遞的次數,不過如果你的任務只想重做一次,或者不關注此資料那就無所謂了。
* * return: restore的數量
* */
public function pendingrestore($idletime = 5000, $perpage = 20)
}$restorenum += $thisnum;
if ($thisnum < $perpage)
}return $restorenum;
}/* 從pending佇列中取任務
*/protected function getpending($count = 1, $start='-', $end='+', $consumer = null)
return $this->_mredis->xpending($this->_mstream, $this->_mgroup, $start, $end, $count, $consumer);}/*
* 取[$start, $end]範圍內的資料, 注意是閉區間
** $count:條數,null時表示取全部
* */
protected function getrange($start = '-', $end = '+', $count = null)else
}
將超時任務放入另乙個名子的消費者pending佇列中,然後從新的消費者歷史資料中取出資料並處理。
/*
* 另一種恢復超時任務的方法
* 思路:將超時任務放入newconsumer的pending中,後續可以從newconsume的歷史中取出資料並處理
** 優點:
* 恢復資料沒有重複讀,刪,插,效率高
* 任務投遞次數會保留在新的pending中
** 缺點:
* consumer需要做改動,至少要改變consumer的名子
* 只能用單程序從歷史資料中讀資料,然後處理。**
* $idletime: 超時時間, 毫秒
* $newconsumer: 之後處理pending任務的消費者名稱
* $perpage: 每次取pending任務的條數
** return: 滿足條件且成功claim的條數
* */
public function pendingclaim($idletime = 5000, $newconsumer=null, $perpage = 20)
$info = $this->getpendinginfo();
$startid = $info[1];
$endid = $info[2];
$claimnum = 0;
/** 使用startid, endid遍歷pending列表
* 因為getpending取的是[startid, endid]
* 所以邊界處的id可能被重複取出,但不影響結果的正確性
* perpage越大/符合xclaim條件的id越多,重複的可能性越小
* */
while($startid != $endid)
//xclaim會根據條件自動過濾任務
$res = $this->_mredis->xclaim($this->_mstream, $this->_mgroup, $newconsumer, $idletime, $ids, ['justid']);
$thisnum = count($res);
$claimnum += $thisnum;
//id是按時間排列,小id未超時,則後面不會超時
//在所有id都有相同的投遞次數的基礎上
if ($thisnum < $perpage)
}return $claimnum;
}
使用pendingclaim後,可以使用乙個單獨程序,通過下面方式獲取超時任務並處理。
$config = [
'server' => '10.10.10.1:6379',
'stream' => 'balltube',
'consumer' => 'pendingprocessor',//pendingclaim中的newconsumer
];$q = new redisqueue($this->_config);
$block = 1000;
$num = 1;
while(1)
$id = key($d);
$data = $d[$id];
$q->deltask($id);
//處理任務邏輯
yourtaskprocessfunc($data);
}
c 畫筆Pen畫虛線
using system using system.collections.generic using system.componentmodel using system.data using system.drawing using system.text using system.window...
c 畫筆Pen畫虛線
using system using system.collections.generic using system.componentmodel using system.data using system.drawing using system.text using system.window...
c 畫筆Pen繪製曲線
using system using system.collections.generic using system.componentmodel using system.data using system.drawing using system.text using system.window...