一、概述

通常程序代码是被串行执行的。计算型程序通常需要在数据上做大量计算。一般情况下多数数据需要被相同的方式处理。
例如: 对于 1000 个粒子的模拟,模拟中会有一个步骤,用其当前速度更新每个粒子的位置: $s=s+v$ ,s[O] 到 s[999] 每个粒子都要进行这个操作。如果可以将多个粒子的计算结合起来一次计算,例如将 4 个粒子的计算放在一个组中计算:

1
2
3
4
s[0] = s[0] + v[0];
s[1] = s[1] + v[1];
s[2] = s[2] + v[2];
s[3] = s[3] + v[3];

多个数据一次性执行一个操作,被叫做 SIMD (single instruction multiple data)。

SIMD 是从指令设计者(即 CPU 制造商)的角度给出的概念名称。在数学上,由固定数量的元素组成的有序组被称作向量。所以 SIMD 也被叫做向量指令。

二、Java 向量化

2.1. 热点代码追踪

热点追踪靠 JVM 本身内部的实现机制去做到的。当代码需要足够多的次数去执行的时候,JVM 会发现自身对这个地方优化很有价值,其中有一个优化就是把它进行向量化。用即时编译器(JIT)把它编译成本地机器代码后,调用底层的 SIMD 的指令。
Java 向量化的三个特点:

  1. 不能手动控制,只能由 JVM 自动处理。

    没有办法做到像 C++ 一样直接调一个底层的 CPU 指令,这个过程是一个自动化的过程。

  2. 代码层面无法找到向量化的显式调用,整个过程是隐式的。

  3. 依赖于 JVM 运行期的热点代码跟踪以及 JIT,所以整个过程是不可靠的。

    依赖于JVM 运行期的热点追踪,包括即时编译器等。万一代码的循环次数不够,没有达到需要即时编译的域值,可能就不会用即时编译器去编译,无法真正做到向量化。也就是说即便你觉得 FOR 循环次数够多了,也不一定真的会有向量化。所以说 Java 的向量化是一种不可靠的优化方式

三、Spark 向量化

向量化执行是 Spark 其中一个非常重要的优化方向。其思想是将算子的执行粒度从每次处理一行变成每次处理一个行组,以此来避免大量的函数调用。通过对行组内部处理按列进行计算,同时利用编译技术减少分支判断检查以及更多的 SIMD 优化执行计划。

3.1. 数据结构

3.1.1. ColumnBatch

Spark 的列式框架处理的数据不是 InternalRow,而是 ColumnarBatch,按照批次把每一列按照向量存储的方式把它一列一列存起来,这就叫列批。ColumnarBatch 通过 ColumnarBatchRow 将多个 ColumnVector 包装为一个基于行的表。它提供了该批处理的行视图,以便 Spark 可以逐行访问数据。

ColumnBatch 包含一个 ColumnVectors 列表,每个 ColumnVectors 包含批量读取请求数据集中一列的字段值。

ColumnBatch 也提供了一个 $rowIterator()$ 公共方法,该方法可以在基于行的视图中返回批数据集,每一行都封装在一个 Spark SQL InternalRow 实例中。

3.1.2. ColumnVector

ColumnVector 是一个存储和处理列式数据的公共数据结构,用于与其他列式存储交换数据,ColumnVector 可用于存储相同类型的数据,并且封装了每种数据类型的访问方法,但只使用 ColumnVector 实例中当前数据类型的方法。

ColumnVector 实例中的 dataType 属性显示了当前的数据类型。

  1. WritableColumnVector

    WritableColumnVector 支持堆内内存存储和堆外内存存储。

    • OnHeapColumnVector: 基于堆内存的列向量数据结构。

    • OffHeapColumnVector

  2. ConstantColumnVector: 用于存储常量数据。

  3. ArrowColumnVector

    Apache Arrow 是想要实现一种与语言无关,规范的内存格式。ArrowColumnVector 对接 Arrow 内存数据格式。

    • 优点: 如果新接入一个 Native Engine,只要把数据转为 Arrow 格式,Spark 可以接入做读取操作等。
    • 缺点: 每种 Native Engine 内部都会有各自的内存数据表达方式。为了统一接入,每一次都要做转换成 Arrow 格式的操作,性能会有损耗。
  4. OrcColumnVector

3.2. 代码生成&向量化

Spark 全阶段代码生成除了消除虚函数的调用等功能外,做了一些向量化处理。跳到、ColumnarToRowExec 代码

Spark 向量化的核心就在于这块代码中,这块代码主要的就是 ColumnarBatch 列批,列批数据结构,用 FOR 循环进行数据的访问,这在 JIT 中会进行优化(优化成向量化)。
而这里还有一个重点就是: Parquet 或者 ORC 这种列式存储,读取出来的时候,天然就是一个列批的数据结构,很方便做向量化操作。

四、SQL 向量化引擎

利用 JIT 进行向量化需要编译器追踪循环的次数的,如果循环次数不的,就不会进行 JIT,也就无法做到向量化。所以好多公司把这种着力于用其他语句实现来进行真正意义上的向量化。

4.1. Gluten

Gluten 沿用 Spark 原有的框架。当一个 SQL 进来,会通过 Spark 的 Catalyst 把 SQL 转成 Spark 的物理计划,然后物理计划会传递给 Gluten。Gluten 会以 Plugin 的方式集成到 Spark 中。在 Physical Plan 交给 Gluten Plugin 的时候,添加扩展规则,把 Physical Plan 转换成语言无关的 Substrait Plan。转换后交由下面的各种 Native 向量化引擎去执行计算。各自的向量化引擎会根据 Substrait Plan 构建自己的 Execute pipeline,然后读取 Input 数据去做计算,计算完后都会以列式方式返回给 Spark。

五、总结