Flink SQL中的Changelog事件处理机制详解
Flink SQL中的Changelog事件处理机制详解
Flink SQL中的Changelog
Changelog介绍
在关系数据库领域,MySQL使用binlog(二进制日志)记录数据库中所有修改操作,包括INSERT、UPDATE和DELETE操作。类似地,Flink SQL中的Changelog主要记录数据变化,以实现增量数据处理。
在MySQL中,binlog可以用于数据备份、恢复、同步和复制。通过读取和解析binlog中的操作记录,可以实现增量数据同步和复制。变更数据捕获(CDC)作为一种常用的数据同步技术,常被用于监控数据库中的数据变化,并将其转换为事件流进行实时处理。CDC工具可用于将关系数据库中的数据变化实时传输到其他系统或数据仓库,以支持实时分析和报告。当前常用的CDC工具包括Debezium和Maxwell。Flink通过FLINK-15331支持了CDC,可以实时地集成外部系统的CDC数据,并实现实时数据同步和分析。
Changelog事件生成和处理
Changelog介绍中提到的binlog和CDC是与Flink集成的外部Changelog数据源,Flink SQL内部也会生成Changelog数据。为了区分事件是否为更新事件,我们将仅包含INSERT类型事件的Changelog称为追加流或非更新流,而同时包含其他类型(例如UPDATE)事件的Changelog称为更新流。Flink中的一些操作(如分组聚合和去重)可以产生更新事件,生成更新事件的操作通常会使用状态,这类操作被称为状态算子。需要注意的是,并非所有状态算子都支持处理更新流。例如,Over窗口聚合和Interval Join暂不支持更新流作为输入。
实时计算引擎VVR 6.0及以上版本的Query操作,对应的运行时算子、是否支持处理更新流消费以及是否产生更新,详情请参见Query操作运行时信息说明。
Changelog的事件类型
FLINK-6047引入了回撤机制,使用INSERT和DELETE两种事件类型(尽管数据源仅支持INSERT事件),实现了流SQL算子的增量更新算法。FLINK-16987以后,Changelog事件类型被重构为四种类型(如下),形成一个完整的Changelog事件类型体系,便于与CDC生态系统连接。
/**
* A kind of row in a Changelog.
*/
@PublicEvolving
public enum RowKind {
/**
* Insertion operation.
*/
INSERT,
/**
* Previous content of an updated row.
*/
UPDATE_BEFORE,
/**
* New content of an updated row.
*/
UPDATE_AFTER,
/**
* Deletion operation.
*/
DELETE
}
Flink不使用包含UPDATE_BEFORE和UPDATE_AFTER的复合UPDATE事件类型的原因主要有两个方面:
拆分的事件无论是何种事件类型(仅RowKind不同)都具有相同的事件结构,这使得序列化更简单。如果使用复合UPDATE事件,那么事件要么是异构的,要么是INSERT或DELETE事件对齐UPDATE事件(例如,INSERT事件仅含有UPDATE_AFTER,DELETE事件仅含有UPDATE_BEFORE)。
在分布式环境下,经常涉及数据shuffle(例如Join、聚合)。即使使用复合UPDATE事件,有时仍需将其拆分为单独的DELETE和INSERT事件进行shuffle,例如下面的示例。
示例
下面是一个复合UPDATE事件必须拆分为DELETE和INSERT事件的场景示例。本文后续也将围绕此SQL作业示例讨论Changelog事件乱序问题并提供相应的解决方案。
-- CDC source tables: s1 & s2
CREATE TEMPORARY TABLE s1 (
id BIGINT,
level BIGINT,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
CREATE TEMPORARY TABLE s2 (
id BIGINT,
attr VARCHAR,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
-- sink table: t1
CREATE TEMPORARY TABLE t1 (
id BIGINT,
level BIGINT,
attr VARCHAR,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
-- join s1 and s2 and insert the result into t1
INSERT INTO t1
SELECT
s1.*, s2.attr
FROM s1 JOIN s2
ON s1.level = s2.id;
假设源表s1中id为1的记录的Changelog在时间t0插入
(id=1, level=10)
,然后在时间t1将该行更新为
(id=1, level=20)
。这对应三个拆分事件:
s1 事件类型
+I(id=1,level=10) INSERT
-U(id=1,level=10) UPDATE_BEFORE
+U(id=1,level=20) UPDATE_AFTER
源表s1的主键是id,但Join操作需要按level列进行shuffle(见子句ON)。
如果Join算子的并发数为2,那么以上三个事件可能会被发送到两个任务中。即使使用复合UPDATE事件,它们也需要在shuffle阶段拆分,来保证数据的并行处理。
Changelog事件乱序问题
乱序原因
假设示例中表s2已有两行数据进入Join算子(+I(id=10,attr=a1),+I(id=20,attr=b1)),Join运算符从表s1新接收到三个Changelog事件。在分布式环境中,实际的Join在两个任务上并行处理,下游算子(示例中为Sink任务)接收的事件序列可能情况如下所示。
情况1 情况2 情况3
+I (id=1,level=10,attr='a1')-U (id=1,level=10,attr='a1')+U (id=1,level=20,attr='b1') +U (id=1,level=20,attr='b1')+I (id=1,level=10,attr='a1')-U (id=1,level=10,attr='a1') +I (id=1,level=10,attr='a1')+U (id=1,level=20,attr='b1')-U (id=1,level=10,attr='a1')
情况1的事件序列与顺序处理中的事件序列相同。情况2和情况3显示了Changelog事件在Flink SQL中到达下游算子时的乱序情况。乱序情况可能会导致不正确的结果。在示例中,结果表声明的主键是id,外部存储进行upsert更新时,在情况2和3中,如果没有其他措施,将从外部存储不正确地删除id=1的行,而期望的结果是
(id=1, level=20, attr='b1')
。
使用SinkUpsertMaterializer解决
在示例中,Join操作生成更新流,其中输出包含INSERT事件(+I)和UPDATE事件(-U和+U),如果不正确处理,乱序可能会导致正确性问题。
唯一键与upsert键
唯一键是指SQL操作后满足唯一约束的列或列组合。在本示例中(s1.id)、(s1.id, s1.level)和(s1.id, s2.id)这三组都是唯一键。
Flink SQL的Changelog参考了binlog机制,但实现方式更加简洁。Flink不再像binlog一样记录每个更新的时间戳,而是通过planner中的全局分析来确定主键接收到的更新历史记录的排序。如果某个键维护了唯一键的排序,则对应的键称为upsert键。对于存在upsert键的情况,下游算子可以正确地按照更新历史记录的顺序接收upsert键的值。如果shuffle操作破坏了唯一键的排序,upsert键将为空,此时下游算子需要使用一些算法(例如计数算法)来实现最终的一致性。
在示例中,表s1中的行根据列level进行shuffle。Join生成多个具有相同s1.id的行,因此Join输出的upsert键为空(即Join后唯一键上不存在排序)。此时,Flink需存储所有输入记录,然后检查比较所有列以区分更新和插入。
此外,结果表的主键为列id。Join输出的upsert键与结果表的主键不匹配,需要进行一些处理将Join输出的行进行正确转换为结果表所需的行。
SinkUpsertMaterializer
根据唯一键与upsert键的内容,当Join输出的是更新流且其upsert键与结果表主键不匹配时,需要一个中间步骤来消除乱序带来的影响,以及基于结果表的主键产生新的主键对应的Changelog事件。Flink在Join算子和下游算子之间引入了SinkUpsertMaterializer算子(FLINK-20374)。
结合乱序原因中的Changelog事件,可以看到Changelog事件乱序遵循着一些规则。例如,对于一个特定的upsert键(或upsert键为空则表示所有列),事件ADD(+I、+U)总是在事件RETRACT(-D、-U)之前发生;即使涉及到数据shuffle,相同upsert键的一对匹配的Changelog事件也总是被相同的任务处理。这些规则也说明了为什么示例仅存在乱序原因中三个Changelog事件的组合。
SinkUpsertMaterializer就是基于上述规则实现的,其工作原理如下图所示。SinkUpsertMaterializer在其状态中维护了一个RowData列表。当SinkUpsertMaterializer被触发,在处理输入行时,它根据推断的upsert键或整个行(如果upsert键为空)检查状态列表中是否存在相同的行。在ADD的情况下添加或更新状态中的行,在RETRACT的情况下从状态中删除行。最后,它根据结果表的主键生成Changelog事件,更多详细信息请参见SinkUpsertMaterializer源代码。
通过SinkUpsertMaterializer,将示例中Join算子输出的Changelog事件处理并转换为结果表主键对应的Changelog事件,结果如下图所示。根据SinkUpsertMaterializer的工作原理,在情况2中,处理
-U(id=1,level=10,attr='a1')
时,会将最后一行从状态中移除,并向下游发送倒数第二行;在情况3中,当处理
+U (id=1,level=20,attr='b1')
时,SinkUpsertMaterializer会将其原样发出,而当处理
-U(id=1,level=10,attr='a1')
时,将从状态中删除行而不发出任何事件。最终,通过SinkUpsertMaterializer算子情况2和3也会得到期望结果
(id=1,level=20,attr='b1')
。