Spark-理论笔记-Join 策略
数据关联总共有 3 种 Join 实现方式。按照出现的时间顺序,分别是嵌套循环连接(NLJ,Nested Loop Join)、排序归并连接(SMJ,Shuffle Sort Merge Join) 和哈希连接(HJ,Hash Join)
1. Join 的实现方式现在有事实表 orders 和维度表 users。其中,users 表存储用户属性信息,orders 记录着用户的每一笔交易。两张表的 Schema 如下:
12345678910// 订单表orders关键字段userId, IntitemId, Intprice, Floatquantity, Int// 用户表users关键字段id, Intname, Stringtype, String //枚举值,分为头部用户和长尾用户
基于两张表做内关联(Inner Join),同时把用户名、单价、交易额等字段投影出来。
123// SQL 查询语句select orders.quantity, orders.price, orders.userId, users.id, users.namefrom orders inn ...
Spark-理论笔记-内存模型
在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业[Job],并将作业转化为计算任务[Task],在各个 Executor 进程间协调任务的调度;后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver, 同时为需要持久化的 RDD 提供存储功能。
1. Execuor 内存模型1.1. 堆内和堆外内存作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内 [On-heap]空间进行了更为详细的分配,以充分利用内存。
同时,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
1.1.1. 堆内内存堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。 ...
Spark-理论笔记-执行内存管理
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程, Shuffle 的 Write 和 Read 两阶段对执行内存的使用.
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程, Shuffle 的 Write 和 Read 两阶段对执行内存的使用.
1. ShuffleShuffle Write在 map 端会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
Shuffle Read在 ExternalSorter 和 Aggregator 中, Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据 但在 Shuffle 过程中所有数据并不能都保存到该哈希表中, 当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度, 无法再从 MemoryManager 申请到新的执行内存时, Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存 [Spill] ...
Spark-理论笔记-RDD
RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 ,是Spark 中最基本的抽象,它代表一个不可变、可分区、里面元素可以并行计算的集合。
特性
A list of partitions一个分区(Partition)列表,组成了该 RDD 的数据。
这里表示一个 RDD 有很多分区,每一个分区内部是包含了该 RDD 的部分数据,spark 中任务是以 task 线程的方式运行, 一个分区就对应一个 task 线程。
用户可以在创建 RDD 时指定 RDD 的分区个数,如果没有指定,那么就会采用默认值。(比如: 读取HDFS上数据文件产生的RDD分区数跟block的个数相等)
A function for computing each splitSpark 中 RDD 的计算是以分区为单位的,RDD 的每个 partition 上面都会有计算函数
A list of dependencies on other RDDs一个 RDD 会依赖于其他多个 RDD
RDD 与 RDD 之间的依赖关系,spark 任务的容错机制就是根据这个特性而 ...
Spark-理论笔记-共享变量
Spark 中所有 transformation 算子是通过分发到多个节点上的并行任务实现运行并行化的。当将一个自定义函数传递给 Spark 算子时(比如map或reduce),该函数所包含的变量会通过副本方式传播到远程节点上。但所有针对这些变量的写操作只会更新到本地,不会传递回驱动程序以及分布式更新,通常跨任务的读写变量是低效的。故而 Spark 提供了两种受限的共享变量:广播变量和共享变量。
广播变量 broadcast variable什么是广播变量广播变量是一种能够分发到集群各个节点上的只读变量,Driver 端将变量分发给各 Executor,Executor 只需保存该变量的一个副本,而不是每个 task 各分发一份,避免了 task 过多时候,Driver 的带宽成为系统瓶颈,以及 task 服务器上的资源消耗。Spark 实现了高效的广播算法保证广播变量得到高效的分发。
不使用广播变量
使用广播变量
广播变量创建和使用定义广播变量12val a = 3val broadcast = sc.broadcast(a)
使用广播变量1val data = bro ...
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Planner 模块-Strategy-BasicOperators
一、概述所有的策略都继承自 GenericStrategy 类,其中定义了 planLater 和 apply 方法;SparkStrategy 继承自 GenericStrategy 类,对其中的 planLater 进行了实现,根据传入的 LogicalPlan 直接生成前述提到的 PlanLater 节点 。 此外,在 Spark SQL 中
Strategy 是 SparkStrategy 类的别名
1type Strategy = SparkStrategy
二、实现
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Planner 模块-Strategy-FileSourceStrategy
一、概述所有的策略都继承自 GenericStrategy 类,其中定义了 planLater 和 apply 方法;SparkStrategy 继承自 GenericStrategy 类,对其中的 planLater 进行了实现,根据传入的 LogicalPlan 直接生成前述提到的 PlanLater 节点 。 此外,在 Spark SQL 中
Strategy 是 SparkStrategy 类的别名
1type Strategy = SparkStrategy
二、实现
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Planner 模块-Strategy-JoinSelection
一、概述所有的策略都继承自 GenericStrategy 类,其中定义了 planLater 和 apply 方法;SparkStrategy 继承自 GenericStrategy 类,对其中的 planLater 进行了实现,根据传入的 LogicalPlan 直接生成前述提到的 PlanLater 节点 。 此外,在 Spark SQL 中
Strategy 是 SparkStrategy 类的别名
1type Strategy = SparkStrategy
二、实现
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Strategy 体系-QueryPlanner
一、概述SparkPlanner 继承自 SparkStrategies 类,而 SparkStrategies 类则继承自 QueryPlanner 基类,重要的 plan() 方法实现就在 QueryPlanner 类中 。 SparkStrategies 类本身不提供任何方法,而是在内部提供一 批 SparkPlanner 会用到的各种策略( Strate盯)实现。 最后 ,在 SparkPlanner 层面将这些策略整
合在一起,通过 plan()方法进行逐个应用 。
类似逻辑计划阶段的 Anaylzer 和 Optimizer,SparkPlanner 本身只是一个逻辑的驱动 ,各种策略的 apply 方法把逻辑执行计划算子映射成物理执行计划算子。
二、实现2.1. $plan$$plan$ 方法传入 LogicalPlan 作为参数,将 strategies 应用 到 LogicalPlan,生成物理计划候选集合(Candidates)。 如果该集合中存在 PlanLater 类型的 SparkPlan,则通过 placeholder 中间变量取 出对应的 Logi ...
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Strategy 体系
一、概述所有的策略都继承自 GenericStrategy 类,其中定义了 planLater 和 apply 方法;SparkStrategy 继承自 GenericStrategy 类,对其中的 planLater 进行了实现,根据传入的 LogicalPlan 直接生成前述提到的 PlanLater 节点 。 此外,在 Spark SQL 中
Strategy 是 SparkStrategy 类的别名
1type Strategy = SparkStrategy
在 Spark SQL中,当逻辑计划处理完毕后,会构造 SparkPlanner 并执行 $plan()$ 方法对 LogicalPlan 进行处理,得到对应的物理计划。一个逻辑计划可能会对应多个物理计划,因此,SparkPlanner 得到的是一个物理计划的列表(Iterator[SparkPlan])。SparkPlanner 继承自 SparkStrategies 类,而 SparkStrategies 类则继承自 QueryPlanner 基类,重要的 $plan()$ 方法实现就在 QueryPlan ...