Spark-源码学习-DPP 设计
一、概述
1.1. 分区裁剪
分区剪裁是谓词下推的一种特例,它指的是在分区表中下推谓词,谓词是分区目录。分区表分不同的目录存储数,如果过滤谓词中包含分区键,Spark SQL 对分区表做扫描的时候,是完全可以跳过不满足谓词条件的分区目录,这就是分区剪裁。
1.2. 动态分区裁剪 DPP(Dynamic Partition Pruning)
1.2.1. 什么是 DPP?
动态分区裁剪就是基于运行时推断出来的信息来进一步进行分区裁剪。从而减少事实表中数据的扫描量降低 I/O开销,提升执行性能。
DPP(Dynamic Partition Pruning,动态分区剪裁)指的是在大表 Join 小表的场景中,可以充分利用过滤之后的小表,在运行时动态的来大幅削减大表的数据扫描量,从整体上提升关联计算的执行性能。
在数仓情景下 Spark SQL 利用维度表提供的过滤信息,减少事实表中数据的扫描量、提升执行性能。
- https://blog.csdn.net/Shyllin/article/details/129202728
- https://zhuanlan.zhihu.com/p/548757324?utm_id=0
1.2.2. 使用条件
但是使用 DPP 的前提条件比较苛刻,需要满足以下条件:
1.2.1. 事实表必须是分区表
1.2.2. 只支持等值 Join
1.2.3. 维度表过滤之后的数据小于广播阅值
维度表过滤之后的数据必须小于广播阈值: spark.sql.autoBroadcastJoinThreshold
为什么需要满足 “维度表过滤之后的数据集要小于广播阈值🤔️?
实现 DPP 机制的关键在于,需要处理事实表的计算分支,能够拿到满足过滤条件的 Join Key 列表,然后用 Join Key 列表对事实表做分区剪裁。
用什么办法才能拿到这个列表呢🤔️?
Spark SQL 使用广播变量封装过滤之后的维度表数据,在维度表做完过滤之后,Spark SQL 在其上构建哈希表,哈希表的 Key 是用于关联的 Join Key。哈希表构建完之后,Spark SQL 将其封装到广播变量中,这个广播变量的作用主要有两个:
给事实表用来做分区剪裁,哈希表中的 Key set 用来给事实表过滤符合条件的数据分区
参与后续的 Broadcast Join 数据关联,这里的哈希表,本质上就是 Hash Join 中的 Build Table,其中的 Key、 Value,记录着数据关联中所需的所有字段,如 users.id、users.name.刚好拿来和事实表做 Broadcast Hash Join
1.3. 不足
Join Keys 往往是高基数(cardinality)的字段,比如 userld;而分区键往往要选择低基数的宇段,否则数据的存储就会非常的分散。需要在存储效率和 DPP 之间做权衡,如果查询效率是第一优先级,可以对 cardinality 较高的 Join Key 做分区键。如果相反,存储效率是第一优先级,放弃DPP 优化机制。
二、实现
2.1. 相关规则
动态分区裁剪功能在 Spark SQL 中主要通过两个规则实现: 逻辑计划优化器规则 PartitionPruning 和 Spark planner 规则 PlanDynamicPruningFilters
2.1.1. 规则 PartitionPruning
PartitionPruning 规则被添加到 SparkOptimizer 中的一个默认批次中,这样它就会在逻辑计划优化阶段被应用。PartitionPruning 规则在应用时主要做以下事情。
- 根据连接操作的类型和选择性,检查 DPP 的适用性
- 估计分区裁剪是否会带来好处
- 如果所有条件都满足,则插入 DPP 谓词
1 | override def apply(plan: LogicalPlan): LogicalPlan = plan match { |
提取出等值 Join 的左右 join keys
遍历查询条件谓词,使用 DPP 优化
插入谓词
检测是否可以插入谓词
$filterableScan.isDefined$
连接的列是否是分区列
$canPruneLeft()$
表示能否对左表进行剪裁,满足左表剪裁的情况只有 Inner 、LeftSemi 、RightOuter 的情况。
1
2
3
4def canPruneLeft(joinType: JoinType): Boolean = joinType match {
case Inner | LeftSemi | RightOuter => true
case _ => false
}$hasPartitionPruningFilter()$
不能在 Stream 应用程序上应用 DDP, 逻辑计划必须知道选择性谓词的定义。
1
2
3private def hasPartitionPruningFilter(plan: LogicalPlan): Boolean = {
!plan.isStreaming && hasSelectivePredicate(plan)
}
$insertPredicate()$
$reuseEnabled$
exchange 重用开启
引用本站文章Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Planner 模块-Rule-Preparations-ReuseExchangeAndSubqueryJoker$hasBenefit$
剪裁有收益
如果开启重用 exchange 或有收益侧插入一个 Filter 的过滤子查询算子
1
2
3
4
5
6
7
8
9
10
11
12
13
14if (reuseEnabled || hasBenefit) {
// insert a DynamicPruning wrapper to identify the subquery during query planning
Filter(
DynamicPruningSubquery(
pruningKey,
filteringPlan,
joinKeys,
index,
conf.dynamicPartitionPruningReuseBroadcastOnly || !hasBenefit),
pruningPlan)
} else {
// abort dynamic partition pruning
pruningPlan
}
2.1.2. PlanDynamicPruningFilters
在逻辑计划优化阶段,PartitionPruning 规则从另一侧插入带有过滤器的重复子查询。然后 Spark planner 在执行计划准备阶段应用 PlanDynamicPruningFilters,通过重新使用广播的结果来删除重复的子查询。PlanDynamicPruningFilters 规则首先检查查询计划是否可以重用广播交换,该计划要求 exchangeResueEnabled 标志设置为true,并且物理连接操作符为 BroadcastHashJoinExec。如果查询计划可以重用广播交换,重复的子查询将被替换成重用的广播结果;如果使用重复的子查询的估计效益仍然超过使用原始的非DPP查询计划,重复的子查询将被保留,否则,该子查询将被抛弃。
DynamicPruningSubquery 的具体的执行过程。从上面的代码可以看出,这里分为三种情况:
如果当前开启了 exchangeReuseEnabled,同时 Plan 中存在 BroadcastHashJoinExec,则会重用当前的 BroadcastExchangeExec,并将其封装为一个InSubqueryExec,最后再包装为 DynamicPruningExpression表达式。
如果 onlyInBroadcast 为 false, 表明要么是 reuseBroadcastOnly 设为 false,(即非 broadcast exchange 时也可使用), 要么是有收益。首先,需要按照需要过滤的 key 做一次聚合操作, 然后再将其封装为一个 InSubqueryExec,最后包装为 DynamicPruningExpression 表达式。
否则,不会执行Query, 执行回退。