首頁(yè)技術(shù)文章正文

Redis實(shí)現(xiàn)消息隊(duì)列的三種方式

更新時(shí)間:2022-08-17 來(lái)源:黑馬程序員 瀏覽量:

消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。最簡(jiǎn)單的消息隊(duì)列模型包括3個(gè)角色:

消息隊(duì)列:存儲(chǔ)消息。

生產(chǎn)者:發(fā)送消息到消息隊(duì)列,在秒殺任務(wù)中負(fù)責(zé)判斷秒殺時(shí)間和庫(kù)存,校驗(yàn)消費(fèi)者權(quán)限是否是一人一單,發(fā)送優(yōu)惠券id和用戶id到消息隊(duì)列中。

消費(fèi)者:從消息隊(duì)列獲取消息并處理消息,接受到訂單消息之后,完成下單。

Redis提供了三種不同的方式來(lái)實(shí)現(xiàn)消息隊(duì)列:

list結(jié)構(gòu):基于List結(jié)構(gòu)模擬消息隊(duì)列。

PubSub:基本的點(diǎn)對(duì)點(diǎn)消息模型。

Stream:比較完善的消息隊(duì)列模型。

消息隊(duì)列

基于List結(jié)構(gòu)模擬消息隊(duì)列

消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。而Redis的list數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,很容易模擬出隊(duì)列效果。

隊(duì)列是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP來(lái)實(shí)現(xiàn)。

不過(guò)要注意的是,當(dāng)隊(duì)列中沒(méi)有消息時(shí)RPOP或LPOP操作會(huì)返回null,并不像JVM的阻塞隊(duì)列那樣會(huì)阻塞并等待消息。因此這里應(yīng)該使用BRPOP或者BLOPO來(lái)實(shí)現(xiàn)阻塞效果。

List結(jié)構(gòu)

List的消息隊(duì)列可利用Redis存儲(chǔ),不受限于JVM內(nèi)存上限,是基于基于Redis的持久化機(jī)制,數(shù)據(jù)安全性有保證,同時(shí)也可以滿足消息隊(duì)列的有序性。但只支持但消費(fèi)者,無(wú)法避免消息丟失是它最大的問(wèn)題。

基于PubSub的消息隊(duì)列

PubSub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè)channel,生產(chǎn)者向?qū)?yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。對(duì)應(yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。

SUBSCRIBE channel [channel] :訂閱一個(gè)或多個(gè)頻道。

PUBLISH channel msg :向一個(gè)頻道發(fā)送消息。

PSUBSCRIBE pattern[pattern] :訂閱與pattern格式匹配的所有頻道。

基于PubSub的消息隊(duì)列

基于PubSub的消息隊(duì)列采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費(fèi),但也有不支持?jǐn)?shù)據(jù)持久化、無(wú)法避免消息丟失,消息堆積有上限的缺點(diǎn),超出時(shí)數(shù)據(jù)會(huì)丟失。

例如:

基于Stream的消息隊(duì)列-XREAD

Stream讀取消息的方式之一:XREAD

Stream讀取消息的方式

例如:

XREAD阻塞方式,讀取最新的消息:

在業(yè)務(wù)開(kāi)發(fā)中,我們可以循環(huán)的調(diào)用XREAD阻塞方式來(lái)查詢最新消息,從而實(shí)現(xiàn)持續(xù)監(jiān)聽(tīng)隊(duì)列的效果,偽代碼如下:

注意:當(dāng)我們指定起始ID為$時(shí),代表讀取最新的消息,如果我們處理一條消息的過(guò)程中,又有超過(guò)1條以上的消息到達(dá)隊(duì)列,則下次獲取時(shí)也只能獲取到最新的一條,會(huì)出現(xiàn)漏讀消息的問(wèn)題。
STREAM類型消息隊(duì)列的XREAD命令,消息可回溯,一個(gè)消息可以被多個(gè)消費(fèi)者讀取,同時(shí)可能阻塞讀取,有消息漏讀的風(fēng)險(xiǎn)。

基于Stream的消息隊(duì)列-消費(fèi)者組

消費(fèi)者組(Consumer Group):將多個(gè)消費(fèi)者劃分到一個(gè)組中,監(jiān)聽(tīng)同一個(gè)隊(duì)列。具備下列特點(diǎn):

1.消息分流:隊(duì)列中的消息會(huì)分流給組內(nèi)的不同消費(fèi)者,而不是重復(fù)消費(fèi),從而加快消息處理的速度。

2.消息標(biāo)示:消費(fèi)者組會(huì)維護(hù)一個(gè)標(biāo)示,記錄最后一個(gè)被處理的消息,哪怕消費(fèi)者宕機(jī)重啟,還會(huì)從標(biāo)示之后讀取消息。確保每一個(gè)消息都會(huì)被消費(fèi)。

3.消息確認(rèn):消費(fèi)者獲取消息后,消息處于pending狀態(tài),并存入一個(gè)pending-list。當(dāng)處理完成后需要通過(guò)XACK來(lái)確認(rèn)消息,標(biāo)記消息為已處理,才會(huì)從pending-list移除。

創(chuàng)建消費(fèi)者組:

XGROUP CREATE  key groupName ID [MKSTREAM]

key:隊(duì)列名稱

groupName:消費(fèi)者組名稱

ID:起始ID標(biāo)示,$代表隊(duì)列中最后一個(gè)消息,0則代表隊(duì)列中第一個(gè)消息

MKSTREAM:隊(duì)列不存在時(shí)自動(dòng)創(chuàng)建隊(duì)列。

其它常見(jiàn)命令:

# 刪除指定的消費(fèi)者組
XGROUP DESTORY key groupName

# 給指定的消費(fèi)者組添加消費(fèi)者
XGROUP CREATECONSUMER key groupname consumername

# 刪除消費(fèi)者組中的指定消費(fèi)者
XGROUP DELCONSUMER key groupname consumername

從消費(fèi)者組讀取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

group:消費(fèi)組名稱

consumer:消費(fèi)者名稱,如果消費(fèi)者不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)消費(fèi)者

count:本次查詢的最大數(shù)量

BLOCK milliseconds:當(dāng)沒(méi)有消息時(shí)最長(zhǎng)等待時(shí)間

NOACK:無(wú)需手動(dòng)ACK,獲取到消息后自動(dòng)確認(rèn)

STREAMS key:指定隊(duì)列名稱

ID:獲取消息的起始ID:">":從下一個(gè)未消費(fèi)的消息開(kāi)始

其它:根據(jù)指定id從pending-list中獲取已消費(fèi)但未確認(rèn)的消息,例如0,是從pending-list中的第一個(gè)消息開(kāi)始。

消費(fèi)者監(jiān)聽(tīng)消息的基本思路:

STREAM類型消息隊(duì)列的XREADGROUP命令特點(diǎn):消息可回溯,一個(gè)消息可以只能被一個(gè)消費(fèi)者讀取??梢宰枞x取.沒(méi)有消息漏讀的風(fēng)險(xiǎn),有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次。

分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!