SpringBoot结合Canal 实现数据同步
创作时间:
作者:
@小白创作中心
SpringBoot结合Canal 实现数据同步
引用
CSDN
1.
https://blog.csdn.net/fangxiang2008/article/details/138617243
Canal是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。本文将详细介绍Canal的原理以及如何在Spring Boot项目中结合RabbitMQ实现数据同步。
1、Canal介绍
Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x,Canal通过伪装成mysql从服务向主服务拉取数据。所以首先我们要了解mysql 主从数据同步。
2、Mysql 主从数据同步
- 从库(slave)会生成两个线程,I/O线程(IOthread),SQL线程(SQLthread)。
- 当slave的I/O线程连接到master后,会去请求master的二进制日志(binlog), 此时master会通过logdump(将主库的二进制日志文件内容传输给从库的过程) 给从库传输binlog。
- 然后slave将拿到的binlog日志依次写入Relaylog(中继日志)的最末端,同时将读取到的Master 的bin-log的文件名和位置记录到master- info文件中,作用为了让slave知道它需要从哪个位置和哪 个日志文件开始同步数据,以保证数据的一致性,并且能够及时获取到master的新的更新操作, 开始数据同步过程。slave不仅在启动时读取 master-info 文件,而且会定期更新该文件中的记 录,以确保记录都是最新的。
- 最后SQL线程会读取Relaylog,并解析为具体操作(比如DDL这种),来实现主从库的操作一致。
3、Canal 原理
- Canal Server与MySQL建立连接后,会通过模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议获取数据库的 binlog(二进制日志)文件。
- Canal Server解析binlog文件,通过网络将解析后的事件传输给消息中间件(Kafka,RabbitMQ、Rocket等),实现数据的实时同步。
4、实现过程
4.1 下载并配置Canal
下载Canal,地址为https://github.com/alibaba/canal/releases,然后解压
修改Canal的配置文件(conf/canal.properties)
# 指定模式
canal.serverMode = rabbitMQ
# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example
# rabbitmq 服务端 ip
rabbitmq.host = 你的ip(注意不要加端口号哦)
# rabbitmq 虚拟主机
rabbitmq.virtual.host = /
# rabbitmq 交换机
rabbitmq.exchange = canal.exchange (这是本例子用的交换机)
# rabbitmq 用户名
rabbitmq.username = 你的用户名
# rabbitmq 密码
rabbitmq.password = 你的密码
rabbitmq.deliveryMode =
修改实例配置文件 conf/example/instance.properties。
#配置 slaveId,自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10
# 数据库地址:配置自己的ip和端口
canal.instance.master.address=你的IP:端口号
# 数据库用户名和密码
canal.instance.dbUsername=用户名
canal.instance.dbPassword=密码
# 指定库和表
canal.instance.filter.regex=.*\..* # 这里的 .* 表示 canal.instance.master.address 下面的所有数据库
# mq config
# rabbitmq 的 routing key
canal.mq.topic=canal.routing.key(这是本例子用的key)
然后重启 canal 服务。
4.2 搭建RabbitMq
搭建RabbitMq并且 配置Exchange、Routing key。
4.3 Spring Boot 实现代码
在Spring Boot项目中实现数据同步,需要完成以下步骤:
- 引入依赖
<!--引入rabbitmq集成的依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 编辑配置文件
spring:
#rabbitmq
rabbitmq:
# host: 192.168.31.189
host: 192.168.1.12
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple: #手动确认消息 ack
acknowledge-mode: manual
retry:
enabled: true
max-attempts: 3
max-interval: 1000ms
server:
port: 8888
- 定义队列的消息接受的数据对象
@Data
public class CanalMessage<T> {
private String type;
private String table;
private List<T> data;
private String database;
private Long es;
private Integer id;
private Boolean isDdl;
private List<T> old;
private List<String> pkNames;
private String sql;
private Long ts;
}
- 定义rabbitmq的配置
@Configuration
@Slf4j
public class RabbitConfig {
/**
* 消息序列化配置
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory 是 RabbitMQ 提供的一个实现了 RabbitListenerContainerFactory 接口的简单消息监听器容器工厂。
// 它的作用是创建和配置 RabbitMQ 消息监听器容器,用于监听和处理消息。
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//ConnectionFactory 是 RabbitMQ 提供的一个接口,用于创建 RabbitMQ 的连接
factory.setConnectionFactory(connectionFactory);
//使用了 Jackson2JsonMessageConverter 将消息转换为 JSON 格式进行序列化和反序列化
factory.setMessageConverter( new Jackson2JsonMessageConverter());
return factory;
}
}
- 消费的代码
@Component
@RabbitListener(queues = "data")
public class MqNotifyReceive {
@RabbitHandler
public void receive(CanalMessage CanalMessage) {
//进行同步业务处理。。。
}
}
热门推荐
乾隆皇帝的八字命理分析:历史与神秘学的交汇
办公软件怎么免费试用
大模型在推荐系统中的应用:从特征工程到pipeline控制
可转债转股卖出策略详解:从入门到精通
柑橘黄化破解:科学施肥与土壤改良
施成,死磕新能源股5年多,从山顶到山脚的“价值毁灭者”
PS文件损坏打不开了怎么办?三种实用修复方法
🏞️湖南省旅游攻略:探索湖南的自然与文化🏞️
内脏与五行对应图:如何理解中医理论中的脏腑与五行关系
10分钟快速阅读法:如何高效获取论文核心信息?
分析完上海教育大区的数据,我们能够发现什么?
2024年高考成绩:齐齐哈尔实验中学vs 绥化一中
怎样制作挂面 家常挂面制作方法分享
“年冲”还是“月冲”?这样选,公积金还贷更划算!
宜家书柜得晾多久 宜家书柜需晾多久?10个小时到几天
男方陪产假津贴申请指南:2025年版
尼克斯队 2024-25 赛季的主要交易目标,不是唐斯
如何让“话题”转变为一个值得研究的“问题”?
长痘痘了?来,我们聊聊……
体内有癌,睡觉先知?睡觉时有这5种情况,或是癌症预警
职场上的人际冲突该怎么处理?
拓跋鲜卑南迁:中华民族发展史上的浓重一笔
新老曹操传横向对比:画质升级、策略多元,这些变化能否打动你?
2025是蛇年,龙蛇交替遇立春,岁气空禄又逢穷,5人要“躲岁”
技术文档翻译的细节把控与精准表达
内容营销 | 如何打造引人入胜的B2B客户案例,让更多人愿意看?
精选内容集|冷门精品动漫推荐探秘那些不容错过的珍贵之作
从 “完美人设” 到争议缠身,Rekkles真的有罪?
比特币破10万又暴跌,56万人爆仓!加密货币市场遭遇重挫
0除以任何数都得0?这个说法是错误的