问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Redis Stream:Redis 5.0 引入的消息传递新模式

创作时间:
作者:
@小白创作中心

Redis Stream:Redis 5.0 引入的消息传递新模式

引用
1
来源
1.
https://www.cnblogs.com/makemylife/p/18135130

Redis Stream是Redis 5.0版本引入的新特性,它提供了一种简单但功能强大的消息传递模式。本文将详细介绍Redis Stream的基本用法,并展示如何在Spring Boot项目中应用Redis Stream。

1. 基础知识

Redis Stream的结构如下图所示,它是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。

  • 每个Redis Stream都有唯一的名称,对应唯一的Redis Key。
  • 同一个Stream可以挂载多个消费组(Consumer Group),消费组不能自动创建,需要使用XGROUP CREATE命令创建。
  • 每个消费组都有一个游标(last_delivered_id),任意一个消费者读取了消息都会使游标向前移动,标识当前消费组消费到哪条消息了。
  • 消费组可以挂载多个消费者(Consumer),每个Consumer并行地读取消息,任意一个消费者读取了消息都会使游标向前移动。
  • 消费者内部有一个属性pending_ids,记录了当前消费者读取但没有回复ACK的消息ID列表。

2. 核心命令

2.1 XADD:向Stream末尾添加消息

使用XADD向队列添加消息,如果指定的队列不存在,则创建一个队列。基础语法格式:

XADD key ID field value [field value ...]
  • key:队列名称,如果不存在就创建
  • ID:消息ID,可以使用*表示由Redis生成,也可以自定义,但要保证递增性
  • field value:记录

示例:

127.0.0.1:6379> XADD mystream * name1 value1 name2 value2
"1712473185388-0"
127.0.0.1:6379> XLEN mystream
(integer) 1
127.0.0.1:6379> XADD mystream * name2 value2 name3 value3
"1712473231761-0"

消息ID的格式为毫秒级时间戳+序号,例如:1712473185388-5,表示当前消息在毫秒时间戳1712473185388产生,并且该毫秒内产生到了第5条消息。

在添加队列消息时,也可以指定队列的长度:

127.0.0.1:6379> XADD mystream MAXLEN 100 * name value1 age 30
"1713082205042-0"

2.2 XRANGE:获取消息列表

使用XRANGE获取消息列表,会自动过滤已经删除的消息。语法格式:

XRANGE key start end [COUNT count]
  • key:队列名
  • start:开始值,-表示最小值
  • end:结束值,+表示最大值
  • count:数量

示例:

127.0.0.1:6379> XRANGE mystream - + COUNT 2
1) 1) "1712473185388-0"
   2) 1) "name1"
      2) "value1"
      3) "name2"
      4) "value2"
2) 1) "1712473231761-0"
   2) 1) "name2"
      2) "value2"
      3) "name3"
      4) "value3"

2.3 XREAD:以阻塞/非阻塞方式获取消息列表

使用XREAD以阻塞或非阻塞方式获取消息列表,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count:数量
  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key:队列名
  • id:消息ID

示例:

127.0.0.1:6379> XREAD streams mystream 0-0
1) 1) "mystream"
   2) 1) 1) "1712473185388-0"
         2) 1) "name1"
            2) "value1"
            3) "name2"
            4) "value2"
      2) 1) "1712473231761-0"
         2) 1) "name2"
            2) "value2"
            3) "name3"
            4) "value3"

XREAD读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。

127.0.0.1:6379> XREAD block 1000 streams mystream $
(nil)
(1.07s)

2.4 XGROUP CREATE:创建消费者组

使用XGROUP CREATE创建消费者组,分两种情况:

  • 从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0
  • 从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $

示例:

127.0.0.1:6379> XGROUP CREATE mystream mygroup 0-0
OK

2.5 XREADGROUP GROUP:读取消费组中的消息

使用XREADGROUP GROUP读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名
  • consumer:消费者名
  • count:读取数量
  • milliseconds:阻塞毫秒数
  • key:队列名
  • ID:消息ID

示例:

127.0.0.1:6379> XREADGROUP group mygroup consumerA count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1712473185388-0"
         2) 1) "name1"
            2) "value1"
            3) "name2"
            4) "value2"

2.6 XACK:消息消费确认

接收到消息之后,需要手动确认(ack),语法格式:

xack key group-key ID [ID ...]

示例:

127.0.0.1:6379> XACK mystream mygroup 1713089061658-0
(integer) 1

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行ack确认消息已经被消费完成。可以使用xpending命令查看消费者未确认的消息ID:

127.0.0.1:6379> xpending mystream mygroup
1) (integer) 1
2) "1713091227595-0"
3) "1713091227595-0"
4) 1) 1) "consumerA"
      2) "1"

2.7 XTRIM:限制Stream长度

使用XTRIM对流进行修剪,限制长度,语法格式:

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1712535017402-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 4
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1712498239430-0"
   2) 1) "name"
      2) "zhangyogn"
2) 1) "1712535017402-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"

3. Spring Boot Redis Stream实战

3.1 添加Spring Boot Redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3.2 YAML文件配置

3.3 RedisTemplate配置

3.4 定义Stream监听器

3.5 定义StreamContainer并启动

3.6 发送消息

执行完成之后,消费者就可以打印如下日志:

演示代码地址:https://github.com/makemyownlife/courage-cache-demo

4. Redis Stream作为消息队列是否完美?

Redis Stream用于消息队列的最大进步在于实现了发布订阅模型。发布订阅模型具有以下特点:

  • 消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
  • 一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。

细品Redis Stream的设计,我们发现它和Kafka非常相似,比如说消费者组、消费进度偏移量等。我们曾经诟病Redis List数据结构用做队列时,因为消费时没有Ack机制,应用异常挂掉导致消息偶发丢失的情况,Redis Stream已经完美地解决了。因为消费者内部有一个属性pending_ids,记录了当前消费者读取但没有回复ACK的消息ID列表。当消费者重新上线,这些消息可以重新被消费。

但Redis Stream用做消息队列完美吗?这个真没有!

  1. Redis本身定位是内存数据库,它的设计之初都是为缓存准备的,并不具备消息堆积的能力。而专业消息队列一个非常重要的功能是数据中转枢纽,Redis的定位很难满足,所以使用起来要非常小心。
  2. Redis的高可用方案可能丢失消息(AOF持久化和主从复制都是异步),而专业消息队列可以针对不同的场景选择不同的高可用策略。

所以,Redis非常适合轻量级消息队列解决方案,轻量级意味着:数据量可控+业务模型简单。

参考文章

笔者开源项目推荐

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号