為什麼寫這個庫在開始自研應用場景有哪些
如何使用
總結
go-queue
之前,針對以下我們調研目前的開源佇列方案:
beanstalkd
有一些特殊好用功能:支援任務priority、延時(delay)、超時重發(time-to-run)和預留(buried),能夠很好的支援分布式的後台任務和定時任務處理。如下是beanstalkd
基本部分:
很幸運的是官方提供了 go client:
但是這對不熟悉beanstalkd
操作的 go 開發者而言,需要學習成本。
類似基於kafka
訊息佇列作為儲存的方案,儲存單元是訊息,如果要實現延時執行,可以想到的方案是以延時執行的時間作為topic
,這樣在大型的訊息系統中,充斥大量一次性的topic
(dq_1616324404788, dq_1616324417622
),當時間分散,會容易造成磁碟隨機寫的情況。
而且在 go 生態中,
同時考慮以下因素:
所以我們自己基於以上兩個基礎元件開發了go-queue
:
基於beanstalkd
開發了dq
,支援定時和延時操作。同時加入redis
保證消費唯一性。
基於kafka
開發了kq
,簡化生產者和消費者的開發api,同時在寫入kafka使用批量寫,節省io。
整體設計如下:
首先在消費場景來說,乙個是針對任務佇列,乙個是訊息佇列。而兩者最大的區別:
所以在背後的基礎設施選型上,也是基於這種消費場景。
而從其中dq
的 api 中也可以看出:
// 延遲任務執行
- dq.delay(msg, delaytime);
// 定時任務執行
- dq.at(msg, attime);
而在我們內部:
分別介紹dq
和kq
的使用方式:
// [producer]
producer := dq.newproducer(dq.beanstalk,
,})
for i := 1000; i < 1005; i++
}
// [consumer]
consumer := dq.newconsumer(dq.dqconf,,},
redis: redis.redisconf,
})consumer.consume(func(body byte) )
和普通的 生產者-消費者 模型類似,開發者也只需要關注以下:
開發者只需要關注自己的任務型別「延時/定時」
消費端的消費邏輯
producer.go
:
// message structure
type message struct
pusher := kq.newpusher(string, "kq")
ticker := time.newticker(time.millisecond)
for round := 0; round < 3; round++
body, err := json.marshal(m)
if err != nil
fmt.println(string(body))
// push to kafka broker
if err := pusher.push(string(body)); err != nil }}
config.yaml
:
name: kq
brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
group: adhoc
topic: kq
offset: first
consumers: 1
consumer.go
:
var c kq.kqconf
conf.mustload("config.yaml", &c)
// withhandle: 具體的處理msg的logic
// 這也是開發者需要根據自己的業務定製化
q := kq.mustnewqueue(c, kq.withhandle(func(k, v string) error ))
defer q.stop()
q.start()
和dq
不同的是:開發者不需要關注任務型別(在這裡也沒有任務的概念,傳遞的都是message data
)。
其他操作和dq
類似,只是將業務處理函式當成配置直接傳入消費者中。
在我們目前的場景中,kq
大量使用在我們的非同步訊息服務;而延時任務,我們除了dq
,還可以使用記憶體版的 timingwheel「go-zero
生態元件」。
歡迎使用 go-zero 並star支援我們!
Kafka分布式訊息佇列框架
既有的訊息佇列框架或者對訊息傳送的可靠性提供了較高的保證,由此帶來較大的負擔,不能滿足海量高吞吐率的要求 或者完全面向實時訊息處理系統,對於批量離線處理的場合無法提供足夠的快取和永續性要求。如何實現 kafka的集群有多個broker伺服器組成,每個型別的訊息被定義為topic,同一topic內部的...
分布式訊息佇列
以下是訊息佇列以下的大綱,本文主要介紹訊息佇列概述,訊息佇列應用場景和訊息中介軟體示例 電商,日誌系統 訊息佇列概述 訊息佇列應用場景 訊息中介軟體示例 jms訊息服務 見第二篇 大型 架構系列 分布式訊息佇列 二 常用訊息佇列 見第二篇 大型 架構系列 分布式訊息佇列 二 參考 推薦 資料 見第二...
分布式訊息佇列
訊息佇列中介軟體是分布式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分布式系統不可缺少的中介軟體。目前在生產環境,使用較多的訊息佇列有activemq,rabbitmq,zeromq,kafka,metamq,rocketmq等。...