问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Apache Flink 双流JOIN详解:JOIN类型、语义及实现原理

创作时间:
作者:
@小白创作中心

Apache Flink 双流JOIN详解:JOIN类型、语义及实现原理

引用
CSDN
1.
https://m.blog.csdn.net/weixin_39827315/article/details/111221921

本文深入探讨了数据库中各种JOIN操作的类型、语义及其在Apache Flink中的实现原理。通过具体的SQL示例和执行过程分析,帮助读者全面理解JOIN操作的核心概念和应用场景。

什么是JOIN

JOIN的本质是从N(N>=1)张表中获取不同的字段,进而得到最完整的记录行。例如,我们需要从学生表(学号,姓名,性别)、课程表(课程号,课程名,学分)和成绩表(学号,课程号,分数)中查询所有学生的姓名、课程名和考试分数。

为什么需要JOIN

JOIN的本质是数据拼接,如果将所有数据列存储在一张大表中,理论上就不需要JOIN。但在实际业务中,由于以下原因,通常需要将数据分布在多张表中:

  • 产生数据的源头可能不是一个系统
  • 即使源头是同一个系统,为了遵循数据库范式(如1NF、2NF、3NF、BCNF),也需要进行表的设计

JOIN的种类

  • CROSS JOIN:交叉连接,计算笛卡儿积
  • INNER JOIN:内连接,返回满足条件的记录
  • OUTER JOIN
  • LEFT:返回左表所有行,右表不存在补NULL
  • RIGHT:返回右表所有行,左边不存在补NULL
  • FULL:返回左表和右表的并集,不存在一边补NULL
  • SELF JOIN:自连接,将表查询时命名不同的别名

JOIN语法

JOIN在SQL89和SQL92中有不同的语法。以INNER JOIN为例:

  • SQL89:表之间用“,”逗号分割,链接条件和过滤条件都在Where子句指定
  • SQL92:将链接条件在ON子句指定,过滤条件在WHERE子句指定,逻辑更为清晰

语义示例说明

我们以学生表(学号,姓名,性别)、课程表(课程号,课程名,学分)和成绩表(学号,课程号,分数)来介绍各种JOIN的语义。

CROSS JOIN

交叉连接会对两个表进行笛卡尔积,也就是LEFT表的每一行和RIGHT表的所有行进行联接,因此生成结果表的行数是两个表行数的乘积。

INNER JOIN

内联接在SQL92中ON表示联接添加,可选的WHERE子句表示过滤条件。例如,查询成绩大于80分的学生学号、学生姓名和成绩。

OUTER JOIN

LEFT OUTER JOIN

左外联接语义是返回左表所有行,右表不存在补NULL。例如,查询没有参加考试的所有学生的成绩单。

RIGHT OUTER JOIN

右外链接语义是返回右表所有行,左边不存在补NULL。

FULL OUTER JOIN

全外链接语义返回左表和右表的并集,不存在一边补NULL。但MySQL数据库不支持FULL OUTER JOIN。

SELF JOIN

自联接是一张表以不同的别名作为左右两个表,可以进行如上的INNER JOIN和OUTER JOIN。

不等值联接

这里说的不等值联接是SQL92语法里面的ON子句里面只有等值联接,比如:

SELECT s.c_no, s.score, no, name
FROM score s RIGHT JOIN student stu ON stu.no != s.c_no;

上面这示例,其实没有什么实际业务价值,在实际的使用场景中,不等值联接往往是结合等值联接,将不等值条件在WHERE子句指定,即, 带有WHERE子句的等值联接。

Flink双流JOIN的支持

Flink目前支持INNER JOIN和LEFT OUTER JOIN(SELF可以转换为普通的INNER和OUTER)。在语义上面Flink严格遵守标准SQL的语义,与上面演示的语义一致。下面我重点介绍Flink中JOIN的实现原理。

双流JOIN与传统数据库表JOIN的区别

传统数据库表的JOIN是静态两张静态表的数据联接,在流上面是动态表,双流JOIN的数据不断流入与传统数据库表的JOIN有如下3个核心区别:

  • 左右两边的数据集合无穷 - 传统数据库左右两个表的数据集合是有限的,双流JOIN的数据会源源不断的流入;
  • JOIN的结果不断产生/更新 - 传统数据库表JOIN是一次执行产生最终结果后退出,双流JOIN会持续不断的产生新的结果。
  • 查询计算的双边驱动 - 双流JOIN由于左右两边的流的速度不一样,会导致左边数据到来的时候右边数据还没有到来,或者右边数据到来的时候左边数据没有到来,所以在实现中要将左右两边的流数据进行保存,以保证JOIN的语义。在Flink中会以State的方式进行数据存储。

数据Shuffle

分布式流计算所有数据会进行Shuffle,怎么才能保障左右两边流的要JOIN的数据会在相同的节点进行处理呢?在双流JOIN的场景,我们会利用JOIN中ON的联接key进行partition,确保两个流相同的联接key会在同一个节点处理。

数据的保存

不论是INNER JOIN还是OUTER JOIN都需要对左右两边的流的数据进行保存,JOIN算子会开辟左右两个State进行数据存储,左右两边的数据到来时候,进行如下操作:

  • LeftEvent到来存储到LState,RightEvent到来的时候存储到RState;
  • LeftEvent会去RightState进行JOIN,并发出所有JOIN之后的Event到下游;
  • RightEvent会去LeftState进行JOIN,并发出所有JOIN之后的Event到下游

简单场景介绍实现原理

INNER JOIN 实现

JOIN有很多复杂的场景,我们先以最简单的场景进行实现原理的介绍,比如:最直接的两个进行INNER JOIN,比如查询产品库存和订单数量,库存变化事件流和订单事件流进行INNER JOIN,JION条件是产品ID,具体如下:

双流JOIN两边事件都会存储到State里面,如上,事件流按照标号先后流入到join节点,我们假设右边流比较快,先流入了3个事件,3个事件会存储到state中,但因为左边还没有数据,所有右边前3个事件流入时候,没有join结果流出,当左边第一个事件序号为4的流入时候,先存储左边state,再与右边已经流入的3个事件进行join,join的结果如图 三行结果会流入到下游节点sink。当第5号事件流入时候,也会和左边第4号事件进行join,流出一条jion结果到下游节点。这里关于INNER JOIN的语义和大家强调两点:

  • INNER JOIN只有符合JOIN条件时候才会有JOIN结果流出到下游,比如右边最先来的1,2,3个事件,流入时候没有任何输出,因为左边还没有可以JOIN的事件;
  • INNER JOIN两边的数据不论如何乱序,都能够保证和传统数据库语义一致,因为我们保存了左右两个流的所有事件到state中。

LEFT OUTER JOIN 实现

LEFT OUTER JOIN 可以简写 LEFT JOIN,语义上和INNER JOIN的区别是不论右流是否有JOIN的事件,左流的事件都需要流入下游节点,但右流没有可以JION的事件时候,右边的事件补NULL。同样我们以最简单的场景说明LEFT JOIN的实现,比如查询产品库存和订单数量,库存变化事件流和订单事件流进行LEFT JOIN,JION条件是产品ID,具体如下:

下图也是表达LEFT JOIN的语义,只是展现方式不同:

上图大主要关注点是当左边先流入1,2事件时候,右边没有可以join的事件时候会向下游发送左边事件并补NULL向下游发出,当右边第一个相同的Join key到来的时候会将左边先来的事件发出的带有NULL的事件撤回(对应上面command的-记录,+代表正向记录,-代表撤回记录)。这里强调三点:

  • 左流的事件当右边没有JOIN的事件时候,将右边事件列补NULL后流向下游;
  • 当右边事件流入发现左边已经有可以JOIN的key的时候,并且是第一个可以JOIN上的右边事件(比如上面的3事件是第一个可以和左边JOIN key P001进行JOIN的事件)需要撤回左边下发的NULL记录,并下发JOIN完整(带有右边事件列)的事件到下游。后续来的4,5,6,8等待后续P001的事件是不会产生撤回记录的。
  • 在Flink系统内部事件类型分为正向事件标记为“+”和撤回事件标记为“-”。

RIGHT OUTER JOIN 和 FULL OUTER JOIN

RIGHT JOIN内部实现与LEFT JOIN类似, FULL JOIN和LEFT JOIN的区别是左右两边都会产生补NULL和撤回的操作。对于State的使用都是相似的,这里不再重复说明了。

复杂场景介绍State结构

上面我们介绍了双流JOIN会使用State记录左右两边流的事件,同时我们示例数据的场景也是比较简单,比如流上没有更新事件(没有撤回事件),同时流上没有重复行事件。那么我们尝试思考下面的事件流在双流JOIN时候是怎么处理的?

上图示例是连续产生了2笔销售数量一样的订单,同时在产生一笔销售数量为5的订单之后,有将该订单取消了(或者退货了),这样在事件流上面就会是上图的示意,这种情况Flink内部如何支撑呢?

根据JOIN的语义以INNER JOIN为例,右边有两条相同的订单流入,我们就应该想下游输出两条JOIN结果,当有撤回的事件流入时候,我们也需要将已经下发下游的JOIN事件撤回,如下:

上面的场景以及LEFT JOIN部分介绍的撤回情况,需要Flink内部需要处理几个核心点:

  • 记录重复记录(完整记录重复记录或者记录相同记录的个数)
  • 记录正向记录和撤回记录(完整记录正向和撤回记录或者记录个数)
  • 记录那一条事件是第一个可以与左边事件进行JOIN的事件

双流JOIN的State数据结构

在Flink内部对不同的场景有特殊的数据结构优化,本篇我们只针对上面说的情况(通用设计)介绍一下双流JOIN的State的数据结构和用途。

数据结构

  • Map
  • 第一级MAP的key是Join key,比如示例中的P001, value是流上面的所有完整事件
  • 第二级MAP的key是行数据,比如示例中的P001, 2,value是相同事件值的个数

数据结构的利用

  • 记录重复记录 - 利用第二级MAP的value记录重复记录的个数,这样大大减少存储和读取
  • 正向记录和撤回记录 - 利用第二级MAP的value记录,当count=0时候删除改元素
  • 判断右边是否产生撤回记录 - 根据第一级MAP的value的size来判断是否产生撤回,只有size由0变成1的时候(第一条和左可以JOIN的事件)才产生撤回

双流JOIN的应用优化

NULL造成的热点

比如我们有A LEFT JOIN B ON A.aCol = B.bCol LEFT JOIN C ON B.cCol = C.cCol 的业务,JOB的DAG如下:

假设在实际业务中有这这样的特点,大部分时候当A事件流入的时候,B还没有可以JION的数据,但是B来的时候,A已经有可以JOIN的数据了,这特点就会导致,A LEFT JOIN B 会产生大量的 (A, NULL),其中包括B里面的 cCol 列也是NULL,这时候当与C进行LEFT JOIN的时候,首先Flink内部会利用cCol对AB的JOIN产生的事件流进行Shuffle, cCol是NULL进而是下游节点大量的NULL事件流入,造成热点。那么这问题如何解决呢?

我们可以改变JOIN的先后顺序,来保证A LEFT JOIN B 不会产生NULL的热点问题,如下:

JOIN ReOrder

对于JOIN算子的实现我们知道左右两边的事件都会存储到State中,在流入事件时候在从另一边读取所有事件进行JOIN计算,这样的实现逻辑在数据量很大的场景会有一定的state操作瓶颈,我们某些场景可以通过业务角度调整JOIN的顺序,来消除性能呢瓶颈,比如:A JOIN B ON A.acol = B.bcol JOIN C ON B.bcol = C.ccol. 这样的场景,如果 A与B进行JOIN产生数据量很大,但是B与C进行JOIN产生的数据量很小,那么我们可以强制调整JOIN的联接顺序,B JOIN C ON b.bcol = c.ccol JOIN A ON a.acol = b.bcol. 如下示意图:

小结

本篇初步向大家介绍传统数据库的JOIN的类型,语义和可以使用的查询优化,再以实际的例子介绍Flink上面的双流JOIN的实现和State数据结构设计,最后向大家介绍双流JION的使用优化。本篇只介绍了等值JOIN(ON 子句只有等值条件),Flink目前也支持非等值联接条件和等值联接条件相结合使用,本质是相当于添加了WHERE子句。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号