Redis List消息队列的三种消费线程模型详解
Redis List消息队列的三种消费线程模型详解
Redis列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。在生产环境中,很多公司都将Redis列表应用于轻量级消息队列。本文将详细介绍如何使用List命令实现消息队列的功能,并深入剖析三种不同的消费者线程模型。
1. 核心流程
生产者使用 LPUSH key element[element...]
命令将消息插入到队列的头部,如果key不存在则会创建一个空的队列再插入消息。
例如,生产者向队列 queue
先后插入了「Java」、「勇哥」、「Go」:
> LPUSH queue Java 勇哥 Go
(integer) 3
消费者使用 RPOP key
命令依次读取队列的消息,先进先出:
> RPOP queue
"Java"
> RPOP queue
"勇哥"
> RPOP queue
"Go"
接下来,我们通过 spring-data-redis
API 演示生产消费流程:
- 生产者:
redisTemplate.opsForList().leftPush("queue" , "Java");
redisTemplate.opsForList().leftPush("queue" , "勇哥");
redisTemplate.opsForList().leftPush("queue" , "Go");
- 消费者:
消费者启动一个独立的线程从队列中读取消息:
上图的伪代码中:
while(true) {
String message = redisTemplate.opsForList().rightPop("queue");
if (message != null) {
// 处理消息
} else {
// 休眠一会
Thread.sleep(100);
}
}
这里要加休眠,主要是为了减少空读的频率,避免CPU无意义的消耗。
有什么更优化的方式吗?有,那就是使用Redis阻塞读取List的命令。
Redis提供了 BLPOP
、BRPOP
阻塞读取的命令,消费者在读取队列没有数据时自动阻塞,直到有新的消息写入队列,才会继续读取新消息执行业务逻辑:
> BRPOP queue 0
参数0表示阻塞等待时间无限制。
这种消费者线程模型非常容易理解,同时也非常适合顺序消费的模式。但是,如果在消费消息时服务器宕机或者断电,可能会丢失一条消息。
接下来,我们探讨三种更高效的消费模型:
- 拉取线程 + 消费线程池(非阻塞模式)
- 拉取线程 + 消费线程池(阻塞模式)
- 拉取线程 + Disruptor(阻塞模式)
2. 拉取线程 + 消费线程池(非阻塞模式)
为了提升消费速度,我们可以将拉取和消费拆分成两种动作,分别通过不同的线程池来处理。拉取线程池负责拉取消息,消费线程池负责消费消息。
伪代码类似:
这种方式可以通过多线程执行大幅度提升消费速度,但是存在以下风险:
- 如果消费速度很慢,生产者速度很高,那么就会在线程池内容易产生消息堆积,可能会导致OOM。
- 如果消费者服务器宕机或者断电,那么会丢失大量的消息。
3. 拉取线程 + 消费线程池(阻塞模式)
为了优化上述模式,我们需要确保当队列已满时,提交消息到线程池会阻塞。
查看线程池执行的源码:
可以看到,第30行调用的是workQueue的非阻塞的offer方法。如果队列已满,新提交的任务并不会被block住,反而会调用后续的reject流程。
解决方案:
- 使用信号量限制同时进入线程池等待队列的任务数。
- 使用线程池的拒绝机制,把新加入的任务put到等待队列里,这样也可以阻塞住生产者。
4. 拉取线程 + Disruptor
Disruptor是一种高性能的异步框架,其核心是环形缓冲区RingBuffer。
环形缓冲区的设计优点:
- 数组长度2^n,通过位运算加快定位速度。
- 无锁设计,每个生产者或消费者线程会先申请可以操作的元素在数组中的位置。
将消费线程池替换成Disruptor有两个明显优点:
- 无锁队列,写入读取性能非常好。
- 当拉取线程提交消息到Disruptor时,若RingBuffer已满,则拉取线程会阻塞,避免无限拉取和OOM问题。
伪代码:
- 定义Disruptor:
- 拉取线程将消息发送到Disruptor Ringbuffer:
- 消费消息:
整体的消费者线程模型如下图:
5. 平滑停服 + 定时任务补偿
无论使用哪种模型,如果服务器突然宕机或断电,都可能丢失消息。推荐两种解决方案:
- 平滑停服
在Unix/Linux系统中,可以使用kill命令发送信号给运行中的进程。常见的信号有:
- SIGTERM(15):请求进程终止,可以被捕捉和处理,用于优雅地停止进程。
- SIGKILL(9):强制终止进程,不能被捕捉或忽略。
- SIGQUIT(3):进程退出并生成核心转储(core dump)。
使用Java的 Runtime.getRuntime().addShutdownHook
方法注册一个关闭钩子:
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown hook triggered. Performing cleanup...");
// 在这里执行清理工作,如关闭资源、保存状态等
}));
- 定时任务补偿
使用List做消息队列,不可避免地会有消息丢失,因此需要定时任务做补偿,每隔一段时间去业务表里查询业务状态机,若状态机不符合条件,则触发补偿策略。