目前任務定時傳送採用的是hutool的定時任務實現的,追粉任務採用的是redis的zset實現的(xxljob定時每秒掃瞄redis)。
面臨的問題:
使用hutool的定時任務,內部實現沒有研究,出問題不好排查把控,所以需要改造一下實現方案。
查了下需要自己實現定時任務網上方案還是挺多的,下面介紹三種我們系統方便實現的方案。
1.使用redis key過期事件實現。
2.使用rocketmq的延遲訊息實現定時任務。
3.使用redis的zset實現。
下面就分別來分析下三種方案的實現。
配置類註冊監聽容器
@bean
redismessagelistenercontainer container
(redisconnectionfactory connectionfactory)
監聽邏輯處理自己的業務
/**
* 監聽redis key過期
* * @author longwh
* @version 1.0, 2020/11/9 21:10
*/@component
public
class
rediskeyexpirationlistener
extends
keyexpirationeventmessagelistener
/** * 針對redis資料失效事件,進行資料處理
* @param message
* @param pattern
*/@override
public
void
onmessage
(message message, byte[
] pattern)
}
其實如果原來業務就沒有使用過mq專門為定時任務配置乙個mq感覺有點奢侈,因為我們的專案中原本就整合了rocketmq,所以這裡再使用乙個延遲訊息代價也不是不可以接受了
使用mq傳送延遲訊息
。
/**
* 延遲訊息傳送
* @author longwh 2020/11/9 - 20:34
**/@data
@slf4j
@component
//自動注入topic等配置
@configurationproperties
(prefix =
"alimq"
)public
class
delaymqproducer
extends
mqnewproducer
implements
initializingbean
/***
* 訊息傳送
* @param message
*/public
void
sendorder
(string message)
}
公司內部封裝的mq傳送
。
@override
public
void
send
(enumsystemtag mqproviderysystag, mqcontent content)
sendresult sendresult = producer.
send
(msg)
; log.
info
("send.topic:[{}],msgid:[{}]"
,sendresult.
gettopic()
,sendresult.
getmessageid()
);}
使用者關注查詢追粉訊息,往zset中插入資料
。
public
void
executefanstask
wechatofficialaccount account = iwechatofficialaccountservice.;if
(objectutil.
isempty
(account)
|| account.
getdisabled()
||!account.
getfansenable()
)//校驗該使用者是否與任務條件匹配
wechatuser curuser = wechatuserservice.
getwechatuserbyconditions
(openid,itaskfansconditionsservice.
getbytaskid
(account.
getfanstaskid()
));if
(objectutil.
isnotempty
(curuser))}
}}
然後使用定時任務定時消費滿足條件的資料
消費zset中的資料
。
/**
* 執行追粉任務中資料
* * @author longwh
* @version 1.0, 2020/8/25 09:19
*/@jobhandler
("fansmessageexec"
)@component
@slf4j
public
class
fansmessagejobhandle
extends
ijobhandler
}catch
(exception e)
", jsonutil.
tojsonstr
(o));}
}else
", jsonutil.
tojsonstr
(o));}
}});
}return
success;}
}
實現方案
重複消費控制
分布式方案
業務吞吐量
是否支援撤回
重啟服務期間訊息是否丟失
業務契合
redis key過期
單機不需要(因為重複設定該key不會產生兩條資料,redis已經幫我們控制)
因為redis key過期事件是redis內部的發布訂閱資料結構實現的,且該訊息為廣播訊息,在分布式下其實需要做訊息冪等判斷
乙個單例項的redis最多能支援2^32個鍵,但實際受限於redis容量
直接刪除key相當於撤回任務
丟失由於是廣播訊息,需要自行控制重複消費,且服務重啟時候訊息會丟失,所以業務契合度不高
rocketmq延時訊息
需要,需要在消費段做冪等,防止重複消費
rocketmq集群模式可支援一條訊息在一台機器上消費
應對百萬級無壓力
rocketmq不支援撤回訊息,需要消費端自己做邏輯判斷
不會丟失
需要控制重複消費,但是吞吐量完全不用擔心
redis zset
不需要,zset結構就是去重的,且消費時候remove判斷了是否成功
支援,集群的機器使用同乙個redis
元素個數最多為2^32個,實際受限於redis例項的容量大小
直接刪除key即為撤回訊息
不會丟失
自動去重,目前業務量完全適用,但隨著業務增長,redis可能成為瓶頸
本人個人傾向於使用redis的zset實現,實現簡單,且工作量也是最少的,等到redis成為業務瓶頸的時候再切換為rocketmq不遲。
Golang定時任務簡單實現
go get github.com robfig cron開啟乙個定時 根據cron表示式進行時間排程,cron可以精確到秒,大部分表示式格式也是從秒開始。c cron.new 預設從分開始,所以加上cron.withseconds 保證定時按照表示式字面意思執行。func main c.start...
Golang定時任務簡單實現
go get github.com robfig cron開啟乙個定時 根據cron表示式進行時間排程,cron可以精確到秒,大部分表示式格式也是從秒開始。c cron.new 預設從分開始,所以加上cron.withseconds 保證定時按照表示式字面意思執行。func main c.start...
SpringBoot簡單的定時任務
1.在啟動類貼上 enablescheduling propertysource import class enablescheduling 定時標籤 public class 2.在需要定時的方法上貼上 scheduled cron 0 0 2 每天凌晨兩點執行 scheduled cron 0 ...