一、概述

MapStatus 用于表示 ShuffleMapTask 返回给 TaskScheduler 的执行结果

二、设计

MapStatus 的伴生对象中定义了 $apply$ 函数,可以直接使用 MapStatus(BlockManagerId, partitionLengths) 创建 MapStatus 实例。

1
2
3
4
5
6
7
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long): MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, uncompressedSizes, mapTaskId)
} else {
new CompressedMapStatus(loc, uncompressedSizes, mapTaskId)
}
}

$apply$ 函数根据 uncompressedSizes 的长度是否大于 2000,分别创建 HighlyCompressedMapStatus 和 CompressedMapStatus,对于较大的数据量使用高度压缩的 HighlyCompressedMapStatus,一般的数据量则使用 CompressedMapStatus。

2.1. CompressedmapStatus

2.2. HighlyCompressedMapStatus