Flink - Checkpoint使用详解1(工作机制、端到端一致性实现原理)
Flink - Checkpoint使用详解1(工作机制、端到端一致性实现原理)
Flink是一个分布式的流处理引擎,而流处理的其中一个特点就是7X24。为了保障Flink作业的持续运行。Flink的内部会将应用状态(state)存储到本地内存或者嵌入式的kv数据库(RocksDB)中,由于采用的是分布式架构,Flink需要对本地生成的状态进行持久化存储,以避免因应用或者节点机器故障等原因导致数据的丢失,Flink是通过checkpoint(检查点)的方式将状态写入到远程的持久化存储,从而就可以实现不同语义的结果保障。
一、基本介绍
1,什么是 Checkpoint?
(1)Checkpoint是Flink实现容错机制的核心功能,它能够根据配置周期性地基于流中各个算子任务的State来生成快照,从而将这些State数据定期持久化存储下来(比如HDFS)。
(2)当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
2,检查点的生成逻辑
(1)我们以下图说明检查点的生成:
左边的输入流是用户行为数据,包括购买(buy)和加入购物车(cart)两种,每种行为数据都有一个偏移量,统计每种行为的个数。
中间的[cart,3]表示目前消费者消费到的那条数据及对应的偏移量,这个信息会存储在基于内存的状态中。
右边的[count(buy), 1]、[count(cart), 3],这些是实时汇总的结果,这些数据也会存储在基于内存的状态中。
Flink触发执行Checkpoint之后会把内存中存储的状态数据写入到下面的持久化存储中。
(2)checkpoint的执行流程如下:
①:当消费到[cart,3]这条数据时,正好达到了checkpoint的执行时机,此时JobManager中的checkpointcoordinator会触发checkpoint开始执行。 此时状态中存储的消费偏移量是4
②:checkpoint真正开始执行的时候,他会先把状态中维护的消费偏移量写入到持久化存储中。
③:写入结束后,DataSource组件会把状态的存储路径信息反馈给JobManager中的checkpointcoordinator。
④、⑤、⑥、⑦:接着后面算子中的状态数据:[count(buy), 1]、[count(cart), 3]也会进行同样的步骤
⑧:等所有的算子都完成了状态数据的持久化存储,也就是说checkpointcoordinator收集到了所有任务反馈给他的状态存储路径,这个时候就认为这一次的checkpoint真正执行成功了,最后他会向持久化存储中再备份一个checkpointmetadata元数据文件,那么本次整个checkpoint流程就完成了。如果中间有任何一个阶段不成功,那么本次checkpoint就宣告失败。,如果中间有一个不成功,那么本次checkpoin就宣告失败。
当达到下一次checkpoint执行时机的时候,会继续重复前面的执行流程。
3,检查点的恢复逻辑
(1)继续接着前面的业务流程,前面我们在消费完第4条数据的时候触发了一次checkpoint。checkpoint执行结束后,紧接着消费者开始消费第5条数据,当把第5条数据buy消费出来之后,在计算的时候由于资源问题导致出现了故障,此时任务异常结束了。
(2)任务结束后,Flink尝试重启任务,并恢复数据到之前的状态。在最开始重启任务的时候,任务中基于内存的状态都是空的。
(3)当任务重新启动之后,会根据指定的快照数据进行恢复,此时上一次在快照时保存的偏移量3、[count(buy), 13]、[count(cart), 3]这些数据对应的都恢复到了正确的位置。
(4)恢复成功之后,任务会基于之前的偏移量3继续往后面消费,所以又把[buy,4]这条数据消费出来了。此时算子中计算的结果,count(buy)就变成了2。这就是正常的数据处理流程了。
4,检查点分界线(Checkpoint Barrier)
(1)Flink分布式快照的一个核心元素是CheckpointBarrier。这些barrier会被注入到数据流中,并作为数据流的一部分与记录一起流动。
当Flink作业设置了检查点时,Flink会在数据流中插入这些特殊记录,以确保在特定点上所有算子的状态都被一致地保存。
barrier永远不会超过记录,它们严格地按顺序流动。
barrier将数据流中的记录分隔为进入当前快照的记录集和进入下一个快照的记录集,相当于将连续的数据流切分为多个有限序列,对应多个Checkpoint周期。
每个barrier都携带着包含了在它前面的记录的快照的ID。
barrier不会中断数据流,因此非常轻巧。
来自不同快照的多个barrier可以同时在数据流中,这意味着多种快照可能并发发生。
整个过程是由Flink的执行引擎在运行时负责处理的,通过协调不同操作符之间的信号和状态来实现数据流中的checkpointbarrier插入。
(2)如下图所示,CheckpointBarrier被插入到数据流中,它将数据流切分成段。Flink的Checkpoint逻辑是,一段新数据流入导致状态发生了变化,Flink的算子接收到ChecpointBarrier后,对状态进行快照。每个CheckpointBarrier有一个ID,表示该段数据属于哪次Checkpoint。如图所示,当ID为n的CheckpointBarrier到达每个算子后,表示要对n-1和n之间状态的更新做快照。CheckpointBarrier有点像EventTime中的Watermark,它被插入到数据流中,但并不影响数据流原有的处理顺序。
5,Exactly-Once(精确一次)的实现原理
(1)Flink提供了精确一次的处理语义,精确一次的处理语义可以理解为:数据可能会重复计算,但是结果状态只有一个。Flink通过Checkpoint机制实现了精确一次的处理语义:
Flink在触发Checkpoint时会向Source端插入checkpointbarrier,checkpointbarriers是从source端插入的,并且会向下游算子进行传递。
checkpointbarriers携带一个checkpointID,用于标识属于哪一个checkpoint,checkpointbarriers将流逻辑是哪个分为了两部分。
对于双流的情况,通过barrier对齐的方式实现精确一次的处理语义。
(2)下面对checkpoint过程进行分解:
- 图1包括两个流,每个任务都会消费一条用户行为数据(包括购买(buy)和加购(cart)),数字代表该数据的偏移量,countbuy任务统计购买行为的个数,councart统计加购行为的个数。
图2,触发checkpoint,JobManager会向每个数据源发送一个新的checkpoint编号,以此来启动检查点生成流程。
图3,当Source任务收到消息后,会停止发出数据,然后利用状态后端触发生成本地状态检查点,并把该checkpointbarrier以及checkpointid广播至所有传出的数据流分区。状态后端会在checkpoint完成之后通知任务,随后任务会向JobManager发送确认消息。在将checkpointbarrier发出之后,Source任务恢复正常工作。
图4,Source任务发出的checkpointbarrier会发送到与之相连的下游算子任务,当任务收到一个新的checkpointbarrier时,会继续等待其他输入分区的checkpointbarrier到来,这个过程称之为barrier对齐,checkpointbarrier到来之前会把到来的数据线缓存起来。
图5,任务收齐了全部输入分区的checkpointbarrier之后,会通知状态后端开始生成checkpoint,同时会把checkpointbarrier广播至下游算子。
图6,任务在发出checkpointbarrier之后,开始处理因barrier对齐产生的缓存数据,在缓存的数据处理完之后,就会继续处理输入流数据。
图7,最终checkpointbarrier会被传送到sink端,sink任务接收到checkpointbarrier之后,会向其他算子任务一样,将自身的状态写入checkpoint,之后向JobManager发送确认消息。JobManager接收到所有任务返回的确认消息之后,就会将此次检查点标记为完成。
附、Kafka+Flink+Kafka 实现端到端一致性
前面分析了Flink任务如何实现端到端的一致性,下面以Kafka+Flink+Kafka这个工作中常见的应用场景进行分析。
Source端:使用KafkaConsumer,将消费偏移量作为状态保存
系统内部:依赖于Checkpoint机制实现
Sink端:使用KafkaProducer,采用两阶段提交实现事务写入
这样,整个Kafka+Flink+Kafka这个流程就可以实现端到端的一致性。下面我将详细分析一下整个流程的实现。
1,第一步
启动初始化任务。在这里先介绍一下任务整个链条的基本情况。
数据源是kafka,这个任务会从kafka的一个topic里面获取数据。
这个topic有2个partition,每个partition都含有 “A”, “B”, “C”, ”D”, “E”5条消息。
任务第一次启动的时候,针对这2个partition的初始消费偏移量(offset)都是0。
针对这2个partition,对应产生了2个消费者,Source-1和Source-2。
2,第二步
消费者开始从partition0读取数据,数据A被读取出来,此时第一个消费者的offset变成了1。
3,第三步
(1)此时,数据A到达了Map算子中。
(2)接着两个消费者都开始读取他们下一条消息(partition0对应的消费者读取了消息B,partition1对应的消费者读取了消息A)。各自将消费者的offset更新成2和1。
(3)此时,正好达到了触发Checkpoint的时机
4,第四步
(1)在触发Checkpoint的时候,JobManager中的CheckpointCoordinator会在Source数据源中插入Barrier标记。
由于有2份数据流,每个数据流中都会插入一个Barrier标记,此时Barrier标记中的checkpointid可以认为是1。
针对partiton0这个数据流,当消费者将数据B消费出来之后,会在数据B后面插入一个Barrier标记,对应的checkpointid为1。
针对partition1这个数据流,当消费者将数据A消费出来之后,会在数据A后面插入一个Barrier标记,对应的checkpointid也是1。
具体Barrier标记会插入到哪个位置,要看触发checkpoint的时候消费到了哪条数据,CheckpointCoordinator就会将Barrier标记插入到触发checkpoint时消费的那条数据后面。
(2)Source组件被触发了Checkpoint之后,Kafka消费者开始将它的状态生成快照,保存到持久化存储中,状态中维护的主要是分区以及消费偏移量信息。对应的就是<Partition0,2>和<Partition1,1>这种数据,
注意:在这里其实省略了topic的名称,状态中也会保存topic名称的。
(3)当Source组件完成了自己的Checkpoint流程之后,它会向JobManager中的CheckpointCoordinator汇报自己已经完成了。
(4)随着数据的继续流动
此时partition0对应的消费者消费出来的数据B已经到达了Map算子中,它后面的Barrier标记也进入到Map算子中,所以它又将数据C消费出来了,数据C此时即将到达Map算子中,之前收到的数据A已经处理过并且发送到下游了。
partition1对应的消费者消费出来的数据A即将到达Map算子中
注意:此时map算子暂时不会触发checkpoint流程,它需要在接收到所有数据流中的Barrier标记之后才会执行,这样才可以保证数据一致性。
5,第五步
(1)当Map算子收到了所有数据流中的Barrier标记之后,就实现了Barrier对齐,这样就可以触发Map算子的Checkpoint了,它会将自己维护的状态生成快照存储到下面这个持久化存储中。
- 当然了,Map算子中也不一定必须要维护状态,这个要根据具体的业务需求,有的业务需求在map算子中也不需要维护状态。
(2)此时消费者会继续消费数据,partition0对应的消费者将数据D消费出来。partition1对应的消费者将数据B消费出来。
注意:
当Map算子收到partition0对应的那个数据流中的Barrier标记之后,partition0对应的消费者还会继续消费后面的数据,此时数据C已经达到了Map算子中,但是数据C不会被立刻计算,他会被缓存起来。
当所有的Barrier标记到齐之后,触发了Checkpoint,Map算子才会去计算这些缓存起来的数据,这些缓存的数据计算完以后再计算新消费过来的数据。因为Barrier标记后面的数据属于下一个Ckeckpoint。
(3)当Map算子完成了自己的Checkpoint流程之后,它会向JobManager中的CheckpointCoordinator汇报自己已经完成了。
6,第六步
(1)接下来Barrier标记会继续向下游流动。
(2)同时Sink组件,其实就是Kafka生产者(KafkaProducer),它会将自己收到的数据写入目的地Kakfa中。
- 但是注意,此时这些数据属于预提交的事务(这些数据暂时无法被消费,可以认为是临时写入kafka中,目前这些数据对外是不可用的)。
7,第七步
当Sink组件收到Barrier标记时,也会触发Sink组件的checkpoint流程,它会将它里面的状态数据生成快照,保存到持久化存储中。
注意:如果Map算子产生了多个子任务,Sink组件也需要收到所有Map算子子任务的Barrier标记之后才会执行checkpoint流程。
8,第八步
(1)当所有算子任务的快照都执行完成了,此时也就意味着这一次Checkpoint完成了。
(2)JobManager中的CheckpointCoordinator会向所有子任务发送通知,告诉他们这次checkpoint成功完成了。
(3)当Sink组件收到这个通知之后,就会执行二次提交,正式提交之前的事务,之前预提交到Kafka中的数据就可以被正常消费使用了。这就是Flink和Kafka实现端到端一致性的整体流程。