一、概述

Spark Runtime Filter 在查询执行期间可以显著缩减中间数据量,并进而减少计算带来的成本。当查询执行中存在 Shuffle 阶段,由于 Executor 首先需要将 Shuffle 数据写到本地磁盈,然后由其他 Executor 通过网络拉取从磁密读取的 Shuffle 数据,所以其成本格外高昂!
因此,能够缩减 Shuffle 的数据量一直以来是 Spark 性能优化的主要工作动向。Spark runtime filter 正是通过将 Join 一端具有高选择性的 Filter 推送到 Join 另一端来缩减 Shuffle 数据量的。

1.1. DPP&Runtime Filter

  1. 动态分区裁剪被裁剪的 join 一边必须是分区的,而且 join 另一边在 exchange 之前存在条件过滤,而且默认
    存在 broadcastJoin 的时候,才会进行分区裁剪
    Runtime Fitter 没有限制,但是Runtime Filter的适用条件更加严格
  2. 动态分区剪裁能够减少 source scan 的10,而 Runtime Fiter 不行,因为动态分区裁剪是基于分区进行过滤
    A9.
  3. Runtime Filter 是可以基于非分区的字段作为join key,而动态分区裁剪必须是基于分区宇段的join key

    二、实现

Spark runtime filter 是针对 Spark 的物理计划所做的性能优化,因此注入 Runtime Filter 发生在物理计划
优化阶段。Spark runtime filter 通过将 Join 一端具有高选择性的 Filter 推送到 Join 另一端来缩减
Shuffle 数据量,进而优化执行性能:

2.1. Bloom Filter

为了支持 Bloom Filter, Spark runtime filter 引l入了聚合國数 Bloom FilterAggregate.BloomFilterAggregate 具有两个非常重要的构造参数

  1. estimatedNumltemsExpression
    此表达式表示估算得到的元素数量,例如:14亿身份证号码。这对应了 Bloom Filter 的n指标。
  2. numBitsExpression
    此表达式表示要使用的位的数量。这对应了 Bloom Filter 的 m指标。BloomFilterAggregate对于使
    用的哈希函数的救量则通过(m / 口)*1og(2) 公式计算得出。

2.2. 相关规则

Spark Runtime Filter 涉及到两个主要的 Rule InjectRuntimeFilter 和 RewritePredicateSubquery

2.2.1. InjectRuntimeFilter

InjectRuntimeFilter

1
2
3
4
5
6
7
8
9
10
11
12
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
case s: Subquery if s.correlated => plan
case _ if !conf.runtimeFilterSemiJoinReductionEnabled &&
!conf.runtimeFilterBloomFilterEnabled => plan
case _ =>
val newPlan = tryInjectRuntimeFilter(plan)
if (conf.runtimeFilterSemiJoinReductionEnabled && !plan.fastEquals(newPlan)) {
RewritePredicateSubquery(newPlan)
} else {
newPlan
}
}

2.2.2. RewritePredicateSubquery

http://www.ai2news.com/blog/2978295/

https://developer.aliyun.com/article/992712