问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Redis List消息队列的三种消费线程模型详解

创作时间:
作者:
@小白创作中心

Redis List消息队列的三种消费线程模型详解

引用
1
来源
1.
https://www.cnblogs.com/makemylife/p/18407450

Redis列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。在生产环境中,很多公司都将Redis列表应用于轻量级消息队列。本文将详细介绍如何使用List命令实现消息队列的功能,并深入剖析三种不同的消费者线程模型。

1. 核心流程

生产者使用 LPUSH key element[element...] 命令将消息插入到队列的头部,如果key不存在则会创建一个空的队列再插入消息。

例如,生产者向队列 queue 先后插入了「Java」、「勇哥」、「Go」:

> LPUSH queue Java 勇哥 Go
(integer) 3

消费者使用 RPOP key 命令依次读取队列的消息,先进先出:

> RPOP queue
"Java"
> RPOP queue
"勇哥"
> RPOP queue
"Go"

接下来,我们通过 spring-data-redis API 演示生产消费流程:

  • 生产者:
redisTemplate.opsForList().leftPush("queue" , "Java");
redisTemplate.opsForList().leftPush("queue" , "勇哥");
redisTemplate.opsForList().leftPush("queue" , "Go");
  • 消费者:

消费者启动一个独立的线程从队列中读取消息:

上图的伪代码中:

while(true) {
    String message = redisTemplate.opsForList().rightPop("queue");
    if (message != null) {
        // 处理消息
    } else {
        // 休眠一会
        Thread.sleep(100);
    }
}

这里要加休眠,主要是为了减少空读的频率,避免CPU无意义的消耗。

有什么更优化的方式吗?有,那就是使用Redis阻塞读取List的命令。

Redis提供了 BLPOPBRPOP 阻塞读取的命令,消费者在读取队列没有数据时自动阻塞,直到有新的消息写入队列,才会继续读取新消息执行业务逻辑:

> BRPOP queue 0

参数0表示阻塞等待时间无限制。

这种消费者线程模型非常容易理解,同时也非常适合顺序消费的模式。但是,如果在消费消息时服务器宕机或者断电,可能会丢失一条消息。

接下来,我们探讨三种更高效的消费模型:

  • 拉取线程 + 消费线程池(非阻塞模式)
  • 拉取线程 + 消费线程池(阻塞模式)
  • 拉取线程 + Disruptor(阻塞模式)

2. 拉取线程 + 消费线程池(非阻塞模式)

为了提升消费速度,我们可以将拉取和消费拆分成两种动作,分别通过不同的线程池来处理。拉取线程池负责拉取消息,消费线程池负责消费消息。

伪代码类似:

这种方式可以通过多线程执行大幅度提升消费速度,但是存在以下风险:

  • 如果消费速度很慢,生产者速度很高,那么就会在线程池内容易产生消息堆积,可能会导致OOM。
  • 如果消费者服务器宕机或者断电,那么会丢失大量的消息。

3. 拉取线程 + 消费线程池(阻塞模式)

为了优化上述模式,我们需要确保当队列已满时,提交消息到线程池会阻塞。

查看线程池执行的源码:

可以看到,第30行调用的是workQueue的非阻塞的offer方法。如果队列已满,新提交的任务并不会被block住,反而会调用后续的reject流程。

解决方案:

  • 使用信号量限制同时进入线程池等待队列的任务数。
  • 使用线程池的拒绝机制,把新加入的任务put到等待队列里,这样也可以阻塞住生产者。

4. 拉取线程 + Disruptor

Disruptor是一种高性能的异步框架,其核心是环形缓冲区RingBuffer。

环形缓冲区的设计优点:

  • 数组长度2^n,通过位运算加快定位速度。
  • 无锁设计,每个生产者或消费者线程会先申请可以操作的元素在数组中的位置。

将消费线程池替换成Disruptor有两个明显优点:

  • 无锁队列,写入读取性能非常好。
  • 当拉取线程提交消息到Disruptor时,若RingBuffer已满,则拉取线程会阻塞,避免无限拉取和OOM问题。

伪代码:

  1. 定义Disruptor:

  1. 拉取线程将消息发送到Disruptor Ringbuffer:

  1. 消费消息:

整体的消费者线程模型如下图:

5. 平滑停服 + 定时任务补偿

无论使用哪种模型,如果服务器突然宕机或断电,都可能丢失消息。推荐两种解决方案:

  1. 平滑停服

在Unix/Linux系统中,可以使用kill命令发送信号给运行中的进程。常见的信号有:

  • SIGTERM(15):请求进程终止,可以被捕捉和处理,用于优雅地停止进程。
  • SIGKILL(9):强制终止进程,不能被捕捉或忽略。
  • SIGQUIT(3):进程退出并生成核心转储(core dump)。

使用Java的 Runtime.getRuntime().addShutdownHook 方法注册一个关闭钩子:

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("Shutdown hook triggered. Performing cleanup...");
    // 在这里执行清理工作,如关闭资源、保存状态等
}));
  1. 定时任务补偿

使用List做消息队列,不可避免地会有消息丢失,因此需要定时任务做补偿,每隔一段时间去业务表里查询业务状态机,若状态机不符合条件,则触发补偿策略。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号