Spark SQL的Join实现方法有哪些

45次阅读
没有评论

共计 4057 个字符,预计需要花费 11 分钟才能阅读完成。

本篇内容主要讲解“Spark SQL 的 Join 实现方法有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Spark SQL 的 Join 实现方法有哪些”吧!

SparkSQL 总体流程介绍

在阐述 Join 实现之前,我们首先简单介绍 SparkSQL 的总体流程,一般地,我们有两种方式使用 SparkSQL,一种是直接写 sql 语句,这个需要有元数据库支持,例如 Hive 等,另一种是通过 Dataset/DataFrame 编写 Spark 应用程序。如下图所示,sql 语句被语法解析 (SQL AST) 成查询计划,或者我们通过 Dataset/DataFrame 提供的 APIs 组织成查询计划,查询计划分为两大类:逻辑计划和物理计划,这个阶段通常叫做逻辑计划,经过语法分析 (Analyzer)、一系列查询优化(Optimizer) 后得到优化后的逻辑计划,最后被映射成物理计划,转换成 RDD 执行。

Join 基本要素

如下图所示,Join 大致包括三个要素:Join 方式、Join 条件以及过滤条件。其中过滤条件也可以通过 AND 语句放在 Join 条件中。

Spark 支持所有类型的 Join,包括:

 inner join

 left outer join

 right outer join

 full outer join

 left semi join

 left anti join

下面分别阐述这几种 Join 的实现。

Join 基本实现流程

总体上来说,Join 的基本实现流程如下图所示,Spark 将参与 Join 的两张表抽象为流式遍历表 (streamIter) 和查找表(buildIter),通常 streamIter 为大表,buildIter 为小表,我们不用担心哪个表为 streamIter,哪个表为 buildIter,这个 spark 会根据 join 语句自动帮我们完成。

在实际计算时,spark 会基于 streamIter 来遍历,每次取出 streamIter 中的一条记录 rowA,根据 Join 条件计算 keyA,然后根据该 keyA 去 buildIter 中查找所有满足 Join 条件 (keyB==keyA) 的记录 rowBs,并将 rowBs 中每条记录分别与 rowAjoin 得到 join 后的记录,最后根据过滤条件得到最终 join 的记录。

从上述计算过程中不难发现,对于每条来自 streamIter 的记录,都要去 buildIter 中查找匹配的记录,所以 buildIter 一定要是查找性能较优的数据结构。spark 提供了三种 join 实现:sort merge join、broadcast join 以及 hash join。

sort merge join 实现

要让两条记录能 join 到一起,首先需要将具有相同 key 的记录在同一个分区,所以通常来说,需要做一次 shuffle,map 阶段根据 join 条件确定每条记录的 key,基于该 key 做 shuffle write,将可能 join 到一起的记录分到同一个分区中,这样在 shuffle read 阶段就可以将两个表中具有相同 key 的记录拉到同一个分区处理。前面我们也提到,对于 buildIter 一定要是查找性能较优的数据结构,通常我们能想到 hash 表,但是对于一张较大的表来说,不可能将所有记录全部放到 hash 表中,另外也可以对 buildIter 先排序,查找时按顺序查找,查找代价也是可以接受的,我们知道,spark shuffle 阶段天然就支持排序,这个是非常好实现的,下面是 sort merge join 示意图。

在 shuffle read 阶段,分别对 streamIter 和 buildIter 进行 merge sort,在遍历 streamIter 时,对于每条记录,都采用顺序查找的方式从 buildIter 查找对应的记录,由于两个表都是排序的,每次处理完 streamIter 的一条记录后,对于 streamIter 的下一条记录,只需从 buildIter 中上一次查找结束的位置开始查找,所以说每次在 buildIter 中查找不必重头开始,整体上来说,查找性能还是较优的。

broadcast join 实现

为了能具有相同 key 的记录分到同一个分区,我们通常是做 shuffle,那么如果 buildIter 是一个非常小的表,那么其实就没有必要大动干戈做 shuffle 了,直接将 buildIter 广播到每个计算节点,然后将 buildIter 放到 hash 表中,如下图所示。

从上图可以看到,不用做 shuffle,可以直接在一个 map 中完成,通常这种 join 也称之为 map join。那么问题来了,什么时候会用 broadcast join 实现呢?这个不用我们担心,spark sql 自动帮我们完成,当 buildIter 的估计大小不超过参数 spark.sql.autoBroadcastJoinThreshold 设定的值(默认 10M),那么就会自动采用 broadcast join,否则采用 sort merge join。

hash join 实现

除了上面两种 join 实现方式外,spark 还提供了 hash join 实现方式,在 shuffle read 阶段不对记录排序,反正来自两格表的具有相同 key 的记录会在同一个分区,只是在分区内不排序,将来自 buildIter 的记录放到 hash 表中,以便查找,如下图所示。

不难发现,要将来自 buildIter 的记录放到 hash 表中,那么每个分区来自 buildIter 的记录不能太大,否则就存不下,默认情况下 hash join 的实现是关闭状态,如果要使用 hash join,必须满足以下四个条件:

 buildIter 总体估计大小超过 spark.sql.autoBroadcastJoinThreshold 设定的值,即不满足 broadcast join 条件

  开启尝试使用 hash join 的开关,spark.sql.join.preferSortMergeJoin=false

  每个分区的平均大小不超过 spark.sql.autoBroadcastJoinThreshold 设定的值,即 shuffle read 阶段每个分区来自 buildIter 的记录要能放到内存中

 streamIter 的大小是 buildIter 三倍以上

所以说,使用 hash join 的条件其实是很苛刻的,在大多数实际场景中,即使能使用 hash join,但是使用 sort merge join 也不会比 hash join 差很多,所以尽量使用 hash

下面我们分别阐述不同 Join 方式的实现流程。

inner join

inner join 是一定要找到左右表中满足 join 条件的记录,我们在写 sql 语句或者使用 DataFrame 时,可以不用关心哪个是左表,哪个是右表,在 spark sql 查询优化阶段,spark 会自动将大表设为左表,即 streamIter,将小表设为右表,即 buildIter。这样对小表的查找相对更优。其基本实现流程如下图所示,在查找阶段,如果右表不存在满足 join 条件的记录,则跳过。

left outer join

left outer join 是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都为 null 的记录。我们在写 sql 语句或者使用 DataFrmae 时,一般让大表在左边,小表在右边。其基本实现流程如下图所示。

right outer join

right outer join 是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都为 null 的记录。所以说,右表是 streamIter,左表是 buildIter,我们在写 sql 语句或者使用 DataFrame 时,一般让大表在右边,小表在左边。其基本实现流程如下图所示。

full outer join

full outer join 相对来说要复杂一点,总体上来看既要做 left outer join,又要做 right outer join,但是又不能简单地先 left outer join,再 right outer join,最后 union 得到最终结果,因为这样最终结果中就存在两份 inner join 的结果了。因为既然完成 left outer join 又要完成 right outer join,所以 full outer join 仅采用 sort merge join 实现,左边和右表既要作为 streamIter,又要作为 buildIter,其基本实现流程如下图所示。

由于左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较 key,如果 key 相等,则 joinrowA 和 rowB,并将 rowA 和 rowB 分别更新到左表和右表的下一条记录;如果 keyA keyB,则说明右表中没有与左表 rowA 对应的记录,那么 joinrowA 与 nullRow,紧接着,rowA 更新到左表的下一条记录;如果 keyA keyB,则说明左表中没有与右表 rowB 对应的记录,那么 joinnullRow 与 rowB,紧接着,rowB 更新到右表的下一条记录。如此循环遍历直到左表和右表的记录全部处理完。

left semi join

left semi join 是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回 null,其基本实现流程如下图所示。

Spark SQL 的 Join 实现方法有哪些

left anti join

left anti join 与 left semi join 相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回 null,否则仅返回左边的记录,其基本实现流程如下图所示。

Spark SQL 的 Join 实现方法有哪些

总结

Join 是数据库查询中一个非常重要的语法特性,在数据库领域可以说是“得 join 者得天下”,SparkSQL 作为一种分布式数据仓库系统,给我们提供了全面的 join 支持,并在内部实现上无声无息地做了很多优化,了解 join 的实现将有助于我们更深刻的了解我们的应用程序的运行轨迹。

到此,相信大家对“Spark SQL 的 Join 实现方法有哪些”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-07-27发表,共计4057字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)