分布式任務 訊息佇列框架 go queue

2021-10-22 19:28:40 字數 2907 閱讀 3260

為什麼寫這個庫

應用場景有哪些

如何使用

總結

在開始自研go-queue之前,針對以下我們調研目前的開源佇列方案:

beanstalkd有一些特殊好用功能:支援任務priority、延時(delay)、超時重發(time-to-run)和預留(buried),能夠很好的支援分布式的後台任務和定時任務處理。如下是beanstalkd基本部分:

很幸運的是官方提供了 go client:

但是這對不熟悉beanstalkd操作的 go 開發者而言,需要學習成本。

類似基於kafka訊息佇列作為儲存的方案,儲存單元是訊息,如果要實現延時執行,可以想到的方案是以延時執行的時間作為topic,這樣在大型的訊息系統中,充斥大量一次性的topicdq_1616324404788, dq_1616324417622),當時間分散,會容易造成磁碟隨機寫的情況。

而且在 go 生態中,

同時考慮以下因素:

所以我們自己基於以上兩個基礎元件開發了go-queue

基於beanstalkd開發了dq,支援定時和延時操作。同時加入redis保證消費唯一性。

基於kafka開發了kq,簡化生產者和消費者的開發api,同時在寫入kafka使用批量寫,節省io。

整體設計如下:

首先在消費場景來說,乙個是針對任務佇列,乙個是訊息佇列。而兩者最大的區別:

所以在背後的基礎設施選型上,也是基於這種消費場景。

而從其中dq的 api 中也可以看出:

// 延遲任務執行

- dq.delay(msg, delaytime);

// 定時任務執行

- dq.at(msg, attime);

而在我們內部:

分別介紹dqkq的使用方式:

// [producer]

producer := dq.newproducer(dq.beanstalk,

,})

for i := 1000; i < 1005; i++

}

// [consumer]

consumer := dq.newconsumer(dq.dqconf,,},

redis: redis.redisconf,

})consumer.consume(func(body byte) )

和普通的 生產者-消費者 模型類似,開發者也只需要關注以下:

開發者只需要關注自己的任務型別「延時/定時」

消費端的消費邏輯

producer.go

// message structure

type message struct

pusher := kq.newpusher(string, "kq")

ticker := time.newticker(time.millisecond)

for round := 0; round < 3; round++

body, err := json.marshal(m)

if err != nil

fmt.println(string(body))

// push to kafka broker

if err := pusher.push(string(body)); err != nil }}

config.yaml

name: kq

brokers:

- 127.0.0.1:19092

- 127.0.0.1:19092

- 127.0.0.1:19092

group: adhoc

topic: kq

offset: first

consumers: 1

consumer.go

var c kq.kqconf

conf.mustload("config.yaml", &c)

// withhandle: 具體的處理msg的logic

// 這也是開發者需要根據自己的業務定製化

q := kq.mustnewqueue(c, kq.withhandle(func(k, v string) error ))

defer q.stop()

q.start()

dq不同的是:開發者不需要關注任務型別(在這裡也沒有任務的概念,傳遞的都是message data)。

其他操作和dq類似,只是將業務處理函式當成配置直接傳入消費者中。

在我們目前的場景中,kq大量使用在我們的非同步訊息服務;而延時任務,我們除了dq,還可以使用記憶體版的 timingwheel「go-zero生態元件」。

歡迎使用 go-zero 並star支援我們!

Kafka分布式訊息佇列框架

既有的訊息佇列框架或者對訊息傳送的可靠性提供了較高的保證,由此帶來較大的負擔,不能滿足海量高吞吐率的要求 或者完全面向實時訊息處理系統,對於批量離線處理的場合無法提供足夠的快取和永續性要求。如何實現 kafka的集群有多個broker伺服器組成,每個型別的訊息被定義為topic,同一topic內部的...

分布式訊息佇列

以下是訊息佇列以下的大綱,本文主要介紹訊息佇列概述,訊息佇列應用場景和訊息中介軟體示例 電商,日誌系統 訊息佇列概述 訊息佇列應用場景 訊息中介軟體示例 jms訊息服務 見第二篇 大型 架構系列 分布式訊息佇列 二 常用訊息佇列 見第二篇 大型 架構系列 分布式訊息佇列 二 參考 推薦 資料 見第二...

分布式訊息佇列

訊息佇列中介軟體是分布式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分布式系統不可缺少的中介軟體。目前在生產環境,使用較多的訊息佇列有activemq,rabbitmq,zeromq,kafka,metamq,rocketmq等。...