一、概述

1.1. 分区裁剪

分区剪裁是谓词下推的一种特例,它指的是在分区表中下推谓词,谓词是分区目录。分区表分不同的目录存储数,如果过滤谓词中包含分区键,Spark SQL 对分区表做扫描的时候,是完全可以跳过不满足谓词条件的分区目录,这就是分区剪裁。

1.2. 动态分区裁剪 DPP(Dynamic Partition Pruning)

1.2.1. 什么是 DPP?

动态分区裁剪就是基于运行时推断出来的信息来进一步进行分区裁剪。从而减少事实表中数据的扫描量降低 I/O开销,提升执行性能。
DPP(Dynamic Partition Pruning,动态分区剪裁)指的是在大表 Join 小表的场景中,可以充分利用过滤之后的小表,在运行时动态的来大幅削减大表的数据扫描量,从整体上提升关联计算的执行性能。

在数仓情景下 Spark SQL 利用维度表提供的过滤信息,减少事实表中数据的扫描量、提升执行性能。

1.2.2. 使用条件

但是使用 DPP 的前提条件比较苛刻,需要满足以下条件:

1.2.1. 事实表必须是分区表

1.2.2. 只支持等值 Join

1.2.3. 维度表过滤之后的数据小于广播阅值

维度表过滤之后的数据必须小于广播阈值: spark.sql.autoBroadcastJoinThreshold

为什么需要满足 “维度表过滤之后的数据集要小于广播阈值🤔️?

实现 DPP 机制的关键在于,需要处理事实表的计算分支,能够拿到满足过滤条件的 Join Key 列表,然后用 Join Key 列表对事实表做分区剪裁。

用什么办法才能拿到这个列表呢🤔️?

Spark SQL 使用广播变量封装过滤之后的维度表数据,在维度表做完过滤之后,Spark SQL 在其上构建哈希表,哈希表的 Key 是用于关联的 Join Key。哈希表构建完之后,Spark SQL 将其封装到广播变量中,这个广播变量的作用主要有两个:

  1. 给事实表用来做分区剪裁,哈希表中的 Key set 用来给事实表过滤符合条件的数据分区

  2. 参与后续的 Broadcast Join 数据关联,这里的哈希表,本质上就是 Hash Join 中的 Build Table,其中的 Key、 Value,记录着数据关联中所需的所有字段,如 users.id、users.name.刚好拿来和事实表做 Broadcast Hash Join

1.3. 不足

Join Keys 往往是高基数(cardinality)的字段,比如 userld;而分区键往往要选择低基数的宇段,否则数据的存储就会非常的分散。需要在存储效率和 DPP 之间做权衡,如果查询效率是第一优先级,可以对 cardinality 较高的 Join Key 做分区键。如果相反,存储效率是第一优先级,放弃DPP 优化机制。

二、实现

2.1. 相关规则

动态分区裁剪功能在 Spark SQL 中主要通过两个规则实现: 逻辑计划优化器规则 PartitionPruning 和 Spark planner 规则 PlanDynamicPruningFilters

2.1.1. 规则 PartitionPruning

PartitionPruning 规则被添加到 SparkOptimizer 中的一个默认批次中,这样它就会在逻辑计划优化阶段被应用。PartitionPruning 规则在应用时主要做以下事情。

  1. 根据连接操作的类型和选择性,检查 DPP 的适用性
  2. 估计分区裁剪是否会带来好处
  3. 如果所有条件都满足,则插入 DPP 谓词
1
2
3
4
5
6
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
// Do not rewrite subqueries.
case s: Subquery if s.correlated => plan // 跳过子查询中包含 DPP 的情况
case _ if !conf.dynamicPartitionPruningEnabled => plan
case _ => prune(plan)
}
  1. 提取出等值 Join 的左右 join keys

  2. 遍历查询条件谓词,使用 DPP 优化

  3. 插入谓词

    1. 检测是否可以插入谓词

      1. $filterableScan.isDefined$

        连接的列是否是分区列

      2. $canPruneLeft()$

        表示能否对左表进行剪裁,满足左表剪裁的情况只有 Inner 、LeftSemi 、RightOuter 的情况。

        1
        2
        3
        4
        def canPruneLeft(joinType: JoinType): Boolean = joinType match {
        case Inner | LeftSemi | RightOuter => true
        case _ => false
        }
      3. $hasPartitionPruningFilter()$

        不能在 Stream 应用程序上应用 DDP, 逻辑计划必须知道选择性谓词的定义。

        1
        2
        3
        private def hasPartitionPruningFilter(plan: LogicalPlan): Boolean = {
        !plan.isStreaming && hasSelectivePredicate(plan)
        }
  1. $insertPredicate()$

    1. $reuseEnabled$

      exchange 重用开启

    2. $hasBenefit$

      剪裁有收益

    3. 如果开启重用 exchange 或有收益侧插入一个 Filter 的过滤子查询算子

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      if (reuseEnabled || hasBenefit) {
      // insert a DynamicPruning wrapper to identify the subquery during query planning
      Filter(
      DynamicPruningSubquery(
      pruningKey,
      filteringPlan,
      joinKeys,
      index,
      conf.dynamicPartitionPruningReuseBroadcastOnly || !hasBenefit),
      pruningPlan)
      } else {
      // abort dynamic partition pruning
      pruningPlan
      }

2.1.2. PlanDynamicPruningFilters

在逻辑计划优化阶段,PartitionPruning 规则从另一侧插入带有过滤器的重复子查询。然后 Spark planner 在执行计划准备阶段应用 PlanDynamicPruningFilters,通过重新使用广播的结果来删除重复的子查询。PlanDynamicPruningFilters 规则首先检查查询计划是否可以重用广播交换,该计划要求 exchangeResueEnabled 标志设置为true,并且物理连接操作符为 BroadcastHashJoinExec。如果查询计划可以重用广播交换,重复的子查询将被替换成重用的广播结果;如果使用重复的子查询的估计效益仍然超过使用原始的非DPP查询计划,重复的子查询将被保留,否则,该子查询将被抛弃。

DynamicPruningSubquery 的具体的执行过程。从上面的代码可以看出,这里分为三种情况:

  1. 如果当前开启了 exchangeReuseEnabled,同时 Plan 中存在 BroadcastHashJoinExec,则会重用当前的 BroadcastExchangeExec,并将其封装为一个InSubqueryExec,最后再包装为 DynamicPruningExpression表达式。

  2. 如果 onlyInBroadcast 为 false, 表明要么是 reuseBroadcastOnly 设为 false,(即非 broadcast exchange 时也可使用), 要么是有收益。首先,需要按照需要过滤的 key 做一次聚合操作, 然后再将其封装为一个 InSubqueryExec,最后包装为 DynamicPruningExpression 表达式。

  3. 否则,不会执行Query, 执行回退。