问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

架构思维:分布式事务一致性_基于 MQ 的可靠消息投递方案

创作时间:
作者:
@小白创作中心

架构思维:分布式事务一致性_基于 MQ 的可靠消息投递方案

引用
CSDN
1.
https://blog.csdn.net/yangshangwei/article/details/146510972

文章目录

  • Pre
  • 引言
  • 案例分析
  • 基于 MQ 的可靠消息投递方案(推荐方案)
  • 基于两阶段提交的解决方案(很少采用)
  • 总结

Pre

分布式协同 - 分布式事务一二事儿
分布式协同 - 分布式事务_2PC & 3PC解决方案
分布式协同 - 分布式事务_TCC解决方案
深入理解分布式技术 - TCC 事务模型及实战
深入理解分布式技术 - 漫谈分布式事务及解决方案
分布式 - AT、TCC、SAGA、XA四种分布式事务模型的工作原理和应用场景
分布式存储 - 那些关于分布式DB的一二事儿
深入理解分布式技术 - 分布式事务总结回顾

引言

架构思维:如何设计一个支持海量数据存储的高扩展性架构中我们了解了分布式技术下的数据分片、存储、复制与一致性的原理性问题,接下来我们继续来看一下一致性的另一个话题:事务一致性。

在分布式场景中,原本一个系统被拆分成多个子系统,要想完成一次写入操作,需要同时协调多个系统,这就带来了分布式事务的问题

分布式事务是指:一次大的操作由多个小操作组成,这些小的操作分布在不同的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败

那怎么设计才能实现系统之间的事务一致性呢?

案例分析

举个例子: 商品系统、促销系统、订单系统 , 当用户下单时,订单系统生成订单,商品系统扣减库存,促销系统扣减优惠券,只有当三个系统的事务都提交之后,才认为此次下单成功,否则失败。

这是一个很典型的分布式事务问题,解决方案也很多,有

  • 两阶段提交协议(Two-Phase Commit,2PC)
  • 3PC
  • TCC
  • 基于消息队列

在实际工作中,很少采用前几种方案,基本都是基于 MQ 的可靠消息投递的方式来实现.

基于 MQ 的可靠消息投递方案(推荐方案)

基于 MQ 的可靠消息队列投递方案是目前互联网最为常用的方式,在应对高并发场景下的分布式事务问题时,种方案通过放弃强一致性,而选择最终一致性,来提高系统的可用性。

还是拿下单场景举例,当订单系统调用优惠券系统时,将扣减优惠券的事件放入消息队列中,最终给优惠券系统来执行,然后只要保证事件消息能够在优惠券系统内被执行就可以了,因为消息已经持久化在消息中间件中,即使消息中间件发生了宕机,我们将它重启后也不会出现消息丢失的问题。

基于 MQ 的可靠消息投递的方案不仅可以解决由于业务流程的同步执行而造成的阻塞问题,还可以实现业务解耦合流量削峰。

这种方案中的可选型的 MQ 也比较多,比如基于 RabbitMQ 或者 RocketMQ,但并不是引入了消息队列中间件就万事大吉了,通常情况下,还需要关注一下如下几个问题。

MQ 自动应答机制导致的消息丢失

订阅消息事件的优惠券服务在接收订单服务投递的消息后,消息中间件(如 RabbitMQ)默认是开启消息自动应答机制,当优惠券系统消费了消息,消息中间件就会删除这个持久化的消息。

但在优惠券系统执行的过程中,很可能因为执行异常导致流程中断,那这时候消息中间件中就没有这个数据了,进而会导致消息丢失。因此要采取编程的方式手动发送应答,也就是当优惠券系统执行业务成功之后,消息中间件才能删除这条持久化消息。

高并发场景下的消息积压导致消息丢失

那瞬时流量剧增,很多没能及时消费的消息积压在 MQ 队列中,这个问题如何解决呢?

分布式部署环境基于网络进行通信,而在网络通信的过程中,上下游可能因为各种原因而导致消息丢失。比如优惠券系统由于流量过大而触发限流,不能保证事件消息能够被及时地消费,这个消息就会被消息队列不断地重试,最后可能由于超过了最大重试次数而被丢弃到死信队列中。

但实际上,需要人工干预处理移入死信队列的消息,于是在这种场景下,事件消息大概率会被丢弃。而这个问题源于订单系统作为事件的生产者进行消息投递后,无法感知它下游(即优惠券系统)的所有操作,那么优惠券系统作为事件的消费者,是消费成功还是消费失败,订单系统并不知道。

顺着这个思路,如果让订单知道消费执行结果的响应,即使出现了消息丢失的情况,订单系统也还是可以通过定时任务扫描的方式,将未完成的消息重新投递来进行消息补偿。这是基于消息队列实现分布式事务的关键,是一种双向消息确认的机制

那么如何落地实现呢?可以先让订单系统把要发送的消息持久化到本地数据库里,然后将这条消息记录的状态设置为待发送,紧接着订单系统再投递消息到消息队列,优惠券系统消费成功后,也会向消息队列发送一个通知消息。当订单系统接收到这条通知消息后,再把本地持久化的这条消息的状态设置为完成。

这样做后,即使最终 MQ 出现了消息丢失,也可以通过定时任务从订单系统的本地数据库中扫描出一段时间内未完成的消息进行重新投递,最终保证订单系统和优惠券系统的最终事务一致性

设计方案

一、核心流程设计
  1. 订单系统主事务
  2. 分支事务处理与确认
二、关键实现细节
  1. 防消息丢失设计
    环节 保障措施
    生产者端 - 消息持久化到本地表(order_message)后再投递MQ- 消息表与订单表同库事务保证原子性
    MQ端 - 开启持久化(persistence)- 设置镜像队列(HA)- 启用生产者确认机制(publisher confirms)
    消费者端 - 关闭自动ACK,业务成功后手动提交偏移量- 失败时重试(最大重试次数=5)后进入死信队列

  2. 幂等性保障

-- 商品服务扣减库存幂等检查
UPDATE inventory 
SET stock = stock - 1 
WHERE item_id = 1001 
  AND stock >= 1 
  AND tx_id = 'TX1' 
  AND NOT EXISTS (
    SELECT 1 FROM tx_log WHERE tx_id = 'TX1'
  );
INSERT INTO tx_log(tx_id) VALUES ('TX1');
  1. 定时补偿任务
// 扫描未完成消息(每30秒执行)
@Scheduled(fixedRate = 30000)
public void compensateMessages() {
    List<OrderMessage> messages = messageDao.selectUnconfirmed(30);
    messages.forEach(msg -> {
        if (needResend(msg)) { // 判断是否需要重发
            mqProducer.resend(msg);
        }
    });
}
// 判断逻辑
private boolean needResend(OrderMessage msg) {
    // 规则1: 超过5分钟未确认
    // 规则2: ACK未全部到达(如只收到库存ACK未收到优惠券ACK)
    // 规则3: 消息发送失败记录
    return System.currentTimeMillis() - msg.getCreateTime() > 300000 
           || !checkAllAckReceived(msg.getTxId());
}
三、高并发优化策略
  1. 消息分区与并行消费
# RocketMQ分区设置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# 设置16个分区(根据业务量调整)
defaultTopicQueueNums = 16
  1. 批量消息处理
// 消费者批量拉取消息(每次100条)
consumer.setConsumeMessageBatchMaxSize(100);
// 商品服务批量扣减库存SQL
UPDATE inventory 
SET stock = stock - 1 
WHERE item_id IN (1001,1002,...) 
  AND tx_id NOT IN (SELECT tx_id FROM tx_log);
  1. 流量削峰设计
  • 延迟队列作用:非核心业务(如短信通知)延后处理,优先保障库存/优惠券扣减
四、异常场景处理
  1. 分支事务失败处理
  2. 最终一致性兜底
-- 每日对账脚本
SELECT 
    o.order_id,
    (i.stock_before - i.stock_after) AS inventory_diff,
    (c.coupon_before - c.coupon_after) AS coupon_diff
FROM orders o
LEFT JOIN inventory_tx i ON o.tx_id = i.tx_id
LEFT JOIN coupon_tx c ON o.tx_id = c.tx_id
WHERE o.status = 'SUCCESS'
  AND (i.stock_diff != 1 OR c.coupon_diff != 1);

五、方案对比

方案 一致性 吞吐量 实现复杂度 适用场景
本地消息表+MQ 最终一致 高(10w+) 中等 电商订单、支付等高频场景
TCC 强一致 中(5w+) 高 金融交易等强一致性要求场景
2PC 强一致 低(1w-) 低 内部系统(低并发)

总结

通过本地消息表 + 双向ACK确认 + 定时补偿的组合方案,在保证高并发处理能力的同时实现分布式事务的最终一致性。该方案核心优势在于:

  1. 业务解耦:订单系统不直接依赖商品/促销服务可用性
  2. 弹性扩展:通过MQ分区与消费者水平扩展应对流量峰值
  3. 故障容忍:消息持久化与补偿机制确保极端情况下的数据修复能力

实际落地时需重点关注:

  • 消息表性能优化:采用分库分表(如按tx_id哈希分片)
  • 消费者幂等性:必须通过唯一事务ID实现
  • 监控告警:对消息积压、ACK超时等指标设置阈值告警

基于两阶段提交的解决方案(很少采用)

2PC 是分布式事务教父级协议,它是数据库领域解决分布式事务最典型的协议。它的处理过程分为准备和提交两个阶段,每个阶段都由协调者(Coordinator)和参与者(Participant)共同完成:

  • 协调者就是事务管理器;
  • 参与者就是具体操作执行的资源管理器。

XA 是由 X/Open 组织提出的分布式事务的规范,规范主要定义了事务管理器(Transaction Manager)和资源管理器(Resource Manager)之间的接口,事务管理器负责全局事务的协调者,资源管理器负责管理实际资源(如 MySQL、Oracle 等数据库)。而Java 平台上事务规范 JTA(Java Transaction API)就是对 XA 分布式事务规范标准的实现。例如在 Spring 中就通过 JtaTransactionManager 来配置分布式事务,然后通过管理多个 ResourceManager 来管理多个数据源,进而操作多个数据库之间的事务。

2PC 运行原理

订单数据、商品数据,以及促销数据被分别存储在多个数据库实例中,用户在执行下单的时候,交易主流程的业务逻辑则集中部署在一个应用服务器集群上,然后通过 Spring 容器访问底层的数据库实例,而容器中的 JTA 事务管理器在这里就作为事务管理器,Resource 资源管理器就作为底层的数据库实例的资源管理器。

假设订单数据,商品数据和促销数据分别保存在数据库 D1,数据库 D2 和数据库 D3 上

准备阶段

准备阶段,事务管理器首先通知所有资源管理器开启事务,询问是否做好提交事务的准备。如资源管理器此时会将 undo 日志和 redo 日志计入事务日志中,并做出应答,当协调者接收到反馈 Yes 后,则准备阶段结束。

提交阶段

当收到所有数据库实例的 Yes 后,事务管理器会发出提交指令。每个数据库接受指令进行本地操作,正式提交更新数据,然后向协调者返回 Ack 消息,事务结束。

中断阶段

果任何一个参与者向协调者反馈了 No 响应,例如用户 B 在数据库 D3 上面的余额在执行其他扣款操作,导致数据库 D3 的数据无法锁定,则只能向事务管理器返回失败。此时,协调者向所有参与者发出 Rollback 请求,参与者接收 Rollback 请求后,会利用其在准备阶段中记录的 undo 日志来进行回滚操作,并且在完成事务回滚之后向协调者发送 Ack 消息,完成事务回滚操作。

以上就是基于 2PC 实现分布式事务的原理。

存在的问题

我们并不会基于 2PC 来实现分布式事务一致性,虽然 2PC 可以借助数据库的本地事务操作,实现起来较为简单,不用侵入业务逻辑,但是它也存在着很多问题。

2PC 在准备阶段会要求每个资源管理器进行资源锁定,如 MySQL 的行锁。否则如果在提交阶段提交之前数据发生改变,就会出现数据不一致的情况。

还是上面的例子,如果商品库存数据为 1,也就是数据库 D1 为 1,在准备阶段询问是否可以扣减库存,商品数据返回可以,此时如果不锁定数据,在提交阶段之前另外一个请求去扣减了数据库 D1 的数据,这时候,在提交阶段再去扣减库存时,数据库 D1 的数据就会超售变成了负 1。

但正因为要加锁,会导致两阶段提交存在一系列问题,最严重的就是死锁问题,一旦发生故障,数据库就会阻塞,尤其在提交阶段,如果发生故障,数据都还处于资源锁定状态,将无法完成后续的事务提交操作。

其次是性能问题,数据库(如 MySQL )在执行过程中会对操作的数据行执行数据行锁,如果此时其他的事务刚好也要操作被锁定的数据行,那它们就只能阻塞等待,使分布式事务出现高延迟和性能低下。

再有就是数据不一致性,在提交阶段,当事务管理器向参与者发送提交事务请求之后,如果此时出现了网络异常,只有部分数据库接收到请求,那么会导致未接收到请求的数据库无法提交事务,整个系统出现数据不一致性。

总结

基于 MQ 的可靠消息投递的可落地性,要抓住“双向确认”的核心原则,只要能实现生产端和消费端的双向确认,这个方案就是可落地了,又因为基于 MQ 来实现,所以天生具有业务解耦合流量削峰的优势。

基于 2PC 的实现方案很少有实际的场景,但还是要掌握它的实现原理和存在的问题。

最后,在实际工作中,并不是所有的业务对事务一致性的要求都那么高。因为更高的要求意味着更多的成本,这也是很多架构复杂度来源之一,所以要尽可能地站在业务实际场景的立足点来回答分布式事务问题。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号