问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Kafka如何实现数据的幂等性操作

创作时间:
作者:
@小白创作中心

Kafka如何实现数据的幂等性操作

引用
CSDN
1.
https://blog.csdn.net/J080624/article/details/141930345

在大数据处理和消息队列系统中,数据的幂等性是一个重要的概念。它确保了即使在系统出现故障或网络不稳定的情况下,数据也不会重复或乱序。本文将详细介绍Kafka中如何实现数据的幂等性操作,以及其背后的原理和局限性。

Kafka幂等性操作的原理

为了确保数据不会重复,Kafka引入了幂等性操作。所谓的幂等性,就是Producer同样的一条数据,无论向Kafka发送多少次,Kafka都只会存储一条。这里所说的"同样的一条数据",指的是不断重试的数据,而不是内容一致的数据。

默认情况下,幂等性是不起作用的。如果想要使用幂等性操作,需要在生产者对象的配置中开启相关配置:

配置项
配置值
说明
enable.idempotence
true
开启幂等性
max.in.flight.requests.per.connection
小于等于5
每个连接的在途请求数,不能大于5,取值范围为[1,5]
acks
all(-1)
确认应答,固定值,不能修改
retries
>0
重试次数,推荐使用Int最大值

Kafka实现幂等性的流程

1. 数据增加唯一性标识

开启幂等性后,为了保证数据不会重复,就需要给每一个请求批次的数据增加唯一性标识。Kafka中,这个标识采用的是连续的序列号数字sequencenum。但是不同的生产者Producer可能序列号是一样的,仅仅靠seqnum还无法唯一标记数据,所以还需要同时对生产者进行区分。Kafka采用申请生产者ID(producerid)的方式对生产者进行区分。在发送数据前,就需要提前申请producerid以及序列号sequencenum

2. 记录生产者的生产状态

Broker中会给每一个分区记录生产者的生产状态:

  • 采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。

3. 判重

判断Broker当前新的请求批次数据在缓存的5个旧的批次中是否存在相同的,如果有相同的,那么说明有重复,当前批次数据不做任何处理。

4. 判断序列号是否连续

如果Broker当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1:

  • 如果是,说明是连续的,顺序没乱,那么继续。
  • 如果不是,那么说明数据已经乱了,发生异常。

5. 重试

Broker根据异常返回响应,通知Producer进行重试。Producer重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后再进行重试即可。

6. 更新数据

如果请求批次不重复,且有序,那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。

7. 缺陷

从上面的流程可以看出,Kafka的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重,在一些对数据敏感的业务中是十分重要的。但是这种幂等性还是有缺陷:

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息有序不重复,多分区无法保证幂等性。
  • 只能保持生产者单个会话的幂等性,无法实现跨会话的幂等性,也就是说如果一个producer挂掉再重启,那么重启前和重启后的producer对象会被当成两个独立的生产者,从而获取两个不同的独立的生产者ID,导致broker端无法获取之前的状态信息,所以无法实现跨会话的幂等。要想解决这个问题,可以采用后续的事务功能。

跨会话的幂等性

对于幂等性的缺陷,Kafka可以采用事务的方式解决跨会话的幂等性。基本的原理就是通过事务功能管理生产者ID,保证事务开启后,生产者对象总能获取一致的生产者ID。

为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator负责实施的。TransactionCoodinator 会将事务状态持久化到该主题中。

事务基本的实现思路就是通过配置的事务ID,将生产者ID进行绑定,然后存储在Kafka专门管理事务的内部主题__transaction_state中,而内部主题的操作是由事务协调器(TransactionCoodinator)对象完成的,这个协调器对象有点类似于咱们数据发送时的那个副本Leader。

其实这种设计是很巧妙的,因为kafka将事务ID和生产者ID看成了消息数据,然后将数据发送到一个内部主题中。这样,使用事务处理的流程和咱们自己发送数据的流程是很像的。

接下来,我们就把这两个流程简单做一个对比。

① 普通数据发生流程

② 事务数据发送流程

通过两张图可以看到,基本的事务操作和数据操作是很像的。不过要注意,我们这里只是简单对比了数据发送的过程,其实它们的区别还在于数据发送后的提交过程。普通的数据操作,只要数据写入了日志,那么对于消费者来讲。数据就可以读取到了,但是事务操作中,如果数据写入了日志,但是没有提交的话,其实数据默认情况下也是不能被消费者看到的。只有提交后才能看见数据。

更为详细的可以参考下图:

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号