问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Redis:基于PubSub(发布/订阅)、Stream流实现消息队列

创作时间:
作者:
@小白创作中心

Redis:基于PubSub(发布/订阅)、Stream流实现消息队列

引用
CSDN
1.
https://blog.csdn.net/weixin_54158370/article/details/144363592

Redis是一种高性能的键值存储系统,广泛应用于缓存、消息队列等场景。本文将详细介绍Redis中基于List、PubSub和Stream实现的消息队列机制,帮助开发者更好地理解和使用Redis的消息队列功能。

Redis - PubSub、Stream流

  • Redis - PubSub、Stream流
  • 1.基于List的消息队列
  • 2.基于PubSub的消息队列
  • 3.基于Stream的消息队列
  • 1.Redis Streams简介
  • 2.Redis Streams基本命令
  • 1.XADD 添加消息到末尾
  • 2.XLEN 获取消息长度
  • 3.XREAD 读取消息 (单消费模式)
  • 4.XGROUP 消费组操作
  • 5.XREADGROUP GROUP 从消费组读取消息
  • 6.XACK 消息确认
  • 7.XPENDING 查看pend数据

1.基于List的消息队列

  • 由于redis的list数据结构为双向链表,则可以通过lpush和rpop来模拟队列效果
  • 由于队列没有消息时候,需要阻塞获取队列数据,而
    lpop和rpop在空队列获取数据时会返回null
    ,所以需要使用
    brpop和blpop来进行阻塞获取
#向data1的list存两个数据
lpush data1  aaa bbb
#右监听data1  等待20秒
brpop data1 20

缺点:

  • 无法避免消息丢失,
  • 只支持单消费者,无法广播

2.基于PubSub的消息队列

基于发布订阅形式,可以广播,生产者向channel(信道)发送消息,可以由多个消息者去订阅,订阅的消费者都可以收到消息

 SUBSCRIBE channel [channel ...]  #订阅一个或多个信道
 PUBLISH channel message  #向一个信道发送消息
 PSUBSCRIBE pattern [pattern ...]  #通过通配符匹配订阅的信道  匹配规则  ?代表一个字符   []代表中括号内的可选字符  *代表任意字符
SUBSCRIBE  log 
PUBLISH  log zhangsan   

缺点:

  • 不支持持久化
  • 消息有上限,超出会导致消息丢失

3.基于Stream的消息队列

1.Redis Streams简介

官方文档:https://redis.io/docs/latest/commands/xadd/

Redis Stream是redis在5.x版本引入的新特性,Redis流是一种数据结构,它类似于一个只可追加的日志,但也实现了多种操作,以克服典型只可追加日志的一些限制。这些操作包括O(1)时间的随机访问和复杂的消费策略,如消费者组。你可以使用流来记录并同时实时分发事件。Redis流的使用案例包括:

  • 事件溯源(例如,跟踪用户操作、点击等)
  • 传感器监测(例如,现场设备的读数)
  • 通知(例如,将每个用户的通知记录存储在单独的流中)

Redis为每个流条目生成一个唯一的ID。你可以使用这些ID在后续检索与其关联的条目,或者读取并处理流中的所有后续条目。请注意,由于这些ID与时间相关,这里显示的ID可能会有所不同,与你自己的Redis实例中看到的ID也会有所不同。

Redis流支持多种修剪策略(以防止流无限制地增长)和多种消费策略(参见XREAD、XREADGROUP和XRANGE)。

2.Redis Streams基本命令

stream消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XGROUP CREATECONSUMER 给指定的消费者组添加消费者
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

1.XADD 添加消息到末尾

1.基本语法

XADD
是唯一可以向流中添加数据的Redis 命令,但 还有其他命令,例如
XDEL

XTRIM
,能够 从流中删除数据。

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
  • key:
    队列名
  • [NOMKSTREAM]:
    队列不存在是否自动创建,默认自动创建
  • [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] :
    设置消息队列的最大消息数量
  • <* | id>:
    消息唯一id,*代表消息由redis自动生成,格式为时间戳-递增序列,可以手动指定
  • field value:
    字段和值(键值对),可以一次添加多个
XADD users * name zhangsan age 18  #向user发送一条name为zhangsan,age为18的消息,返回消息id

2.指定stream的id参数

1526919030474-55

ID标识流中的给定消息数据。

如果指定的ID参数是*字符,XADD命令将自动生成一个唯一的ID。然而,尽管仅在极少数情况下有用,但可以指定一个格式良好的ID,以便新条目将与指定的ID完全相同。

XADD mystream 1526919030474-55 message "Hello,"

当自动生成ID时,第一部分是Redis实例生成ID的Unix时间(毫秒)。第二部分只是一个序列号,用于区分在同一毫秒内生成的ID。

XADD mystream 1526919030474-* message " World!"

还可以指定一个不完整的ID,只自动生成序列号部分(
注意:6.0版本不支持报错

stream的数据是有序的,所以消息的id始终的递增的,如果手动指定一个小于上一条数据的id则会出错

2.XLEN 获取消息长度

XLEN users  #返回消息个数

3.XREAD 读取消息 (单消费模式)

基础语法

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • [COUNT count] :
    每次读取的最大数量
  • STREAMS key:
    从那个队列读取消息,key为读取的队列名
  • id [id ...]:
    起始id,代表从那个id的消息开始读取;
    0
    代表从
    第一个

    $
    代表从
    最新
    的消息读取

测试

 xread count 1 streams users 0  #读取users中最开始的一条数据

**
消息读取后不会删除,所有消费者都可以重复获取
**

 xread count 1 streams users $  #读取最新消息 ,返回为nil空
 xread count 1 block 0 streams users $    #永久阻塞读取

但是阻塞方式监听到消息后会关闭,需要重新监听
此时在开发中我们可以使用死循环来无限读取最新消息进行监听

**
但是: 当指定起始id为$时,代表读取最新消息,如果处理消息过程中,又有超过一条以上的消息到达,则下次也只能获取一条最新的消息会导致其他数据漏读
**

4.XGROUP 消费组操作

  • 消费者组:
    将多个消费者划分到同一个消费组,监听同一个队列
  • 分流消费:
    队列中的消费将会分流给消费者组中的消费者,不会重复消费,加快消息消费速度
  • 消息标识:
    消费者组会维护一个标识,记录最后一个被处理(非最新)的消息,即使redis挂机重启,也可以按照标识恢复读取,确保消息消费
  • 消息确认机制:
    消费者获取消息后,消息处于pending状态,
    并存入一个pend-list
    ,当处理完成时通过
    XACK
    来确认消息,标记为已处理,才pend-list移除

XGROUP CREATE 创建消费者组

XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
  • key:
    队列名称
  • group:
    消费者组名称
  • <id | $>:
    起始id标识, 0代表第一个, $代表最新消息
  • [MKSTREAM]:
    队列不存在时自动创建队列,如果不存在且不指定会报错
  • [ENTRIESREAD entries-read]
    : redis7.0后的参数,
    创建消费者组g1
XGROUP CREATE users g1 0

XGROUP CREATECONSUMER 给指定的消费者组添加消费者

XGROUP CREATECONSUMER key group consumer

XGROUP DESTROY 删除指定的消费者组

XGROUP DESTROY key group

XGROUP DELCONSUMER 删除消费者组中指定的消费者

XGROUP DELCONSUMER key group consumer

5.XREADGROUP GROUP 从消费组读取消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
  • group:
    消费者组名称
  • consumer:
    消费者名称,如果不存在会自动创建
  • [COUNT count] :
    每次读取的最大数量
  • STREAMS key:
    从那个队列读取消息,key为读取的队列名
  • [NOACK]
    无需消息确认(类似自动确认)。
  • id [id ...]:
    起始id

注意:id取值:

">" :
从下一个未消费的消息开始,非最新消息,确保都消费
其他数字:
根据指定id从pend-list中获取已消费但未确认消息,例如0,从pend-list第一个消息开始
所以当正常处理时的id都采用">" 进行消费,如果出现异常可以指定0,每次都读取第一个pend-list的消息,即每次都是读取最新的未处理数据,将异常数据处理掉

测试

XREADGROUP GROUP  g1 c1 COUNT 1 BLOCK 2000 STREAMS users >

6.XACK 消息确认

XACK key group id [id ...]

测试

XACK users g1 1733738565351-0 1733738570018-0 1733738567511-0 1733738587327-0

7.XPENDING 查看pend数据

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
  • key:
    队列名称
  • group:
    消费者组名称
  • [IDLE min-idle-time]:
    :查看过去空闲时间的以上的消息,比如给5000,则查询空闲时间5000ms以上的消息
  • start end
    消息起始范围 “- +”代表所有
  • count:
    获取数量

测试

XPENDING users g1 - + 10

参考来源:https://www.bilibili.com/video/BV1cr4y1671t/?spm_id_from=333.788.videopod.episodes&vd_source=97a7d9497f7eb9e537f6b50df8831e27&p=75

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号