Spark 中所有 transformation 算子是通过分发到多个节点上的并行任务实现运行并行化的。当将一个自定义函数传递给 Spark 算子时(比如map或reduce),该函数所包含的变量会通过副本方式传播到远程节点上。但所有针对这些变量的写操作只会更新到本地,不会传递回驱动程序以及分布式更新,通常跨任务的读写变量是低效的。故而 Spark 提供了两种受限的共享变量:广播变量和共享变量。

广播变量 broadcast variable

什么是广播变量

广播变量是一种能够分发到集群各个节点上的只读变量,Driver 端将变量分发给各 Executor,Executor 只需保存该变量的一个副本,而不是每个 task 各分发一份,避免了 task 过多时候,Driver 的带宽成为系统瓶颈,以及 task 服务器上的资源消耗。Spark 实现了高效的广播算法保证广播变量得到高效的分发。

不使用广播变量
截屏2021-03-07 上午10.21.16
使用广播变量
截屏2021-03-07 上午10.24.25

广播变量创建和使用

定义广播变量
1
2
val a = 3
val broadcast = sc.broadcast(a)
使用广播变量
1
val data = broadcast.value

注意

  • 变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改
  • 不能将一个 RDD 使用广播变量广播出去,RDD 是不存储数据的。可以将 RDD 的结果广播出去。
  • 广播变量只能在 Driver 端定义,不能在 Executor 端定义。
  • 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。
  • 如果 Executor 端用到了 Driver 的变量,如果不使用广播变量在 Executor 有多少 task 就有多少 Driver 端的变量副本。
  • 如果 Executor 端用到了 Driver 的变量,如果使用广播变量在每个 Executor 中只有一份 Driver 端的变量副本。

累加器

累加器类似于 MapReduce 中的分布式计数器,是一个整数值,能够在在各个任务中单独修改,之后自动汇总得到全局值。累加器常用于追踪状态的运行状态,方便对 Spark 的程序进行调试和监控。

使用用累加器通过两个分区计算 List(1,2,3,4,5,6,7,8) 的每个元素之和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("AccumulatorTest")
val sc = new SparkContext(conf)
val dataRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8))
//定义累加器,类型为Long类型
val longAccumulator: LongAccumulator = sc.longAccumulator
//遍历RDD的每个元素
dataRDD.foreach(x => {
//执行累加器的+功能
longAccumulator.add(x)
})
println(longAccumulator.value)
}
}