目錄
定時任務的需求所謂是數不勝數,其中實現方式也是百花齊放,用得最多的大概率為springboot中的 @scheduled(cron = 「0 0 1 1 * ?」) 註解,或者是定時任務xxl-job框架,這兩者我接觸的比較多,除此之外還有,quartz 、elastic-job、但這兩個在分布式領域而言,和xxl-jobb比較,xxl-job更為受歡迎。無論是這些框架或者是springboot自帶的定時任務元件,基本上都能滿足固定定時任務的需求。而我們今天討論的是動態定時任務的實現。
動態定時任務的需求其實在現實生活中隨處可見,如花費到期多久之後傳送資訊提醒使用者?時間間隔是多少。又或者客戶下單之後多久提醒商家發貨,提醒的頻率又是多少…。這樣的需求還有很多。今天我們針對此類需求進行**。
對於此類需求相比於傳統的定時任務無非多了可控性, 其可控性包括了定時任務開始和結束時間的可控性,週期性可控性,只要解決了這兩個問題,實際上此類的需求也就迎刃而解了。
前面提供的方案只做文字探索性描述。
2.1、 採用重寫springboot 的定時框架,從資料庫中讀取cron表示式來實現可控性週期。
其本質是通過如下執行緒進行動態定時任務的建立,從而實現對應的週期可控性。
threadpooltaskscheduler threadpooltaskscheduler = new threadpooltaskscheduler();
其具體的細節不再說,其存在的痛點包括了
1 . 需要另外邏輯去實現可控性開始時間和結束時間
2. 此任務開啟的入參是corn表示式,需要另外的邏輯將其進行轉化,太過於猥瑣
2.2、採用時間執行緒池
時間執行緒池我忘記叫什麼,他是可以指定開始時間,週期時間的,相對而言,比第一種方案來得更為直觀,其我考慮到的痛點如下。其實上面那種方案也是有這個痛點的。
2.3、採用延時操作
簡單言之,實際上只要實現了延時操作 便是實現了動態的開始時間以及週期性執行,可以利用其遞迴的概念實現所謂的動態週期。
redis 佇列來實現延時
redis的體量本身定位就不高,在資料量(任務量)過大時,對redis的壓力也很大,redis不一定扛得住。但其實通過redis來實現延時訊息這樣的成功案列還是有很多的。在這裡就不細說了。
rabbitmq實現延時訊息。
通過mq實現延時訊息是本文的重點,在標題三會細說。
通過建立死信佇列來實現延時任務,然後再通過遞迴思想實現對應的邏輯,就可以實現對應的動態延時任務,但是這個會存在以下下幾個痛點。
佇列順序消費
通過死信,我們確實可以動態的控制訊息的消費時間,但是訊息在佇列裡面,如果佇列裡面存在多個資訊任務,前乙個未到消費時間,後乙個已經到了消費時間,這就好導致了,即使後面任務資訊消費時間到了,卻沒法被消費的問題。解決方法,對佇列進行排序邏輯,但如果這樣做的話,就有點猥瑣了。
開銷過大。
對於通過死信來實現延時訊息,網上有挺多優秀的部落格介紹,在此就不做說明了。
使用延時外掛程式需要mq在3.6以上(實際上我在嘗試**的時候並未發現git上有對應3.6的外掛程式,所以還是選擇較高的版本比較好)。
這裡不做過多說明,重點在於編碼的實現,主要步驟如下。
去官網**對應版本的外掛程式,位址為**位址
外掛程式名字為rabbitmq_delayed_message_exchange
將外掛程式放到mq外掛程式目錄下,然後cmd命令解壓網(網上有命令),然後重啟mq服務。大概就這樣的乙個過程。
建立佇列
這裡只弄了對應的核心**,大致就是建立延時交換機,延時佇列,以及繫結器,對應的key,value如下
public static final string delay_exchange = "delay.exchange";
public static final string delay_route_key = "delay.rou程式設計客棧tekey";
public static final string delay_queue = "delay.queue";
/*** 延時交換機
* @return 延時交換機
*/@bean
public customexchange delayexchange()
/*** mq已經安裝了延時外掛程式使用,否則得使用延時外掛程式
* @return 延時傳送佇列。
*/@bean
public queue delayqueue()
/*** 延時繫結區
* @return 延時繫結區
*/@bean
public binding delaybind()
生產者這裡寫得比較隨意,也直接使用了lombok,還直接用了 @service ,有點草率,主要為了讓讀者看得清晰。還用了hutool工具類的jsonutil。
可以清晰的看到主方法裡面需要傳乙個integer型別的入參,這個時間我將其轉換成了秒,其mq實際入參為毫秒,所以讀者不要被誤導。入參time通俗的講就是這個訊息多久之後被消費。不需要在乎順序。
package com.linkyoyo.bill.mq.impl;
import cn.hutool.core.util.objectutil;
import cn.hutool.json.jsonobject;
import cn.hutool.json.jsonutil;
import com.linkyoyo.bill.bo.workorderdelaysenmailactionbo;
import com.linkyoyo.bill.config.rabbitmqconfig;
import com.linkyoyo.bill.mq.delaysenderservice;
import lombok.allargsconstructor;
import lombok.requiredargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.scheduling.annotation.async;
import org.springframework.stereotype.service;
/** * 延時傳送
* @author 鄒 [[email protected]]
* @date 2022/1/4 20:33
*/@slf4j
@requiredargsconstructor
@allargsconstructor
@service
public class delaysenderserviceimpl implements delaysenderservice
rabbittemplate.convertandsend(rabbitmqconfig.delay_exchange, rabbitmqconfig.delay_route_key, message, msg -> );
log.info("延時傳送成功:延時週期時間{}毫秒,訊息內容為{}", time * 1000, message);
}@override
public void sendmessagebydelay(workorderdelaysenmailactionbo actionbo)
this.sendmessagebydelay(jsonutil.parseobj(actionbo), aftersecond);}}
消費者消費者的demo不太好寫,只是做了乙個簡單的偽**。 以定時任務發郵箱為例
1- 消費者執行緒開始,先執行發郵箱任務
2- 傳送完郵箱之後判斷是否還需要發郵箱,如果需要,就再通過生產者傳送延時郵箱 此時可以指定下一次消費的時間,以此流程走下去便是一套動態任務的流程實現。可以參考後續的流程圖。
這樣就實現乙個簡易的定時任務傳送郵箱的邏輯
private final delaysenderservice delaysenderservice;
rtequshemp@rabbithandler
@rabbitlistener(queues = rabbitmqconfig.delay_queue)
public void delayconsumer(message message)
log.info("資訊為:{}", message.getbody());
}大致流程就這麼多了,以下是整套步驟流閉環程圖
Python通過RabbitMQ實現RPC
usr bin env python coding utf 8 import pika import uuid import time class fibonaccirpcclient object def init self 生成socket self.connection pika.blocki...
RabbitMQ如何實現延遲佇列?
延遲佇列儲存的物件肯定是對應的延遲訊息,所謂 延遲訊息 是指當訊息被傳送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。場景一 在訂單系統中,乙個使用者下單之後通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那麼這個訂單將進行一場處理。這是就可以使用延...
通過docker安裝rabbitmq
docker ubuntu安裝docker 直接使用官網安裝指令碼自動安裝,安裝命令如下 curl fssl bash s docker mirror aliyun 也可以使用手動安裝,具體安裝方法如下 docker安裝完成之後,可以了解一些簡單的命令 使用docker安裝rabbitmq 1 拉取...