问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

通过Canal监控MySql的Binlog解决数据库和Redis的数据一致性问题

创作时间:
作者:
@小白创作中心

通过Canal监控MySql的Binlog解决数据库和Redis的数据一致性问题

引用
CSDN
1.
https://blog.csdn.net/qq_58488139/article/details/145727538

在分布式系统中,数据库和缓存的数据一致性问题是一个常见的挑战。本文将介绍一种通过Canal监控MySQL Binlog来解决这一问题的技术方案。

一、延迟双删策略

📌 延迟双删方案流程

1️⃣ 第一次删除 Redis 缓存(防止读取旧缓存)

2️⃣ 更新 MySQL 数据(确保数据存储正确)

3️⃣ 等待一定时间(如 500ms)(确保数据库事务提交)

4️⃣ 再次删除 Redis 缓存(防止缓存回滚)

📌 为什么要延迟删除(双删)?

  • 确保 MySQL 事务真正提交,再进行缓存删除,避免并发更新问题
  • 避免缓存回滚问题(即线程 B 在事务提交前写入了旧数据)

📌 问题:延迟双删时延迟时间难以确定

  • 不同业务的数据库事务执行时间不同,如果延迟时间太短,可能 MySQL 事务还没提交,缓存仍然被回滚;
  • 如果延迟时间太长,会影响系统的实时性,导致缓存失效时间过长,影响性能。

二、延迟双删+MQ策略

由于延迟双删方案延迟时间难以确定,所以使用MQ消息队列来监听MySQL的事务提交情况,当数据库事务真正提交后,再删除缓存。

📌 延迟双删+MQ方案流程

  1. 删除 Redis 缓存(防止其他线程读取旧数据)

  2. 更新 MySQL 数据(写入新值)

  3. 发送异步消息到 MQ(如 Kafka、RabbitMQ、RocketMQ)

  4. 消费者(Listener)监听 MQ,等待 MySQL 事务完成(MQ 的作用就是确保数据库事务真正提交后,再次删除缓存

  5. 消费者收到 MQ 消息后,删除 Redis 缓存

  6. 当用户下次查询时,发现 Redis 为空,从 MySQL 读取新数据并写回缓存

📌 问题:MQ 消息处理有延迟

  • MQ 消息不是实时处理的,可能会有几 ms 到几十 ms的延迟
  • 在高并发下,缓存可能短时间内仍然存在

三、通过Canal监控MySql的Binlog解决数据库和Redis的数据一致性问题

3.1 开启并配置Binlog

找到 MySQL 配置文件,修改成如下配置:

binlog_format=ROW #基于行的复制。binlog 中记录的是每行数据的变化。
binlog-do-db=aicloud # 需要同步的数据库名
binlog-do-db=aicloud2 # 实现分库分表后aicloud2也需要同步

3.2 配置Canal

修改canal.properties:

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9099

打开 Canal 目录下 conf\example 中的 instance.properties 配置文件进行修改:

canal.instance.mysql.slaveId=100 #从节点id
canal.instance.master.address=127.0.0.1:3306 #连接的 MySQL 地址
canal.mq.topic=ai-cloud-canal-to-kafka # 发送到 Kafka "ai-cloud-canal-to-kafka"主题下

3.3 监听binlog

Canal会作为从节点,读取binlog日志

3.4 修改缓存

删除缓存中产生变化的行数据,下次查询该数据时就会缓存未命中,从而查询数据库以更更新缓存。

@Resource
private KafkaTemplate kafkaTemplate;
@Resource
private RedisTemplate<String,Object> redisTemplate;
@Resource
private ObjectMapper objectMapper;
private static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";
@KafkaListener(topics = CANAL_TOPIC)
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {
    HashMap<String,Object> map = objectMapper.readValue(data, HashMap.class);
    if (!map.isEmpty() && map.get("database").toString().equals("aicloud") && map.get("table").toString().equals("answer")){
        // 更新Redis缓存
        ArrayList<LinkedHashMap<String,Object>> list = (ArrayList<LinkedHashMap<String,Object>>) map.get("data");
        String cacheKey = "";
        for (LinkedHashMap<String,Object> answer : list){
            cacheKey = AppVariable.getListCacheKey(Long.parseLong(answer.get("uid").toString()),Integer.parseInt(answer.get("model").toString()) , Integer.parseInt(answer.get("type").toString()));
            redisTemplate.opsForValue().set(cacheKey,null);
        }
    }
    ack.acknowledge(); //手动确认应答
}
© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号