Spark-源码学习-SparkSession-ShareState
一、概述
ShareState 负责保存多个有效 session 之间的共享状态。SharedState 中包含变量: warehousePath、cacheManager、statusStore、externalCatalog、globalTempViewManager、jarClassLoader。这些变量对于所有的 SparkSession 都是公用的。
二、初始化
1 | lazy val sharedState: SharedState = { |
综上可以看出,SparkSession 重用 sharedState,但是会新 $clone$ 或者创建新的 sessionState,但是如果在创建多个 SparkSession 时,传入的 existingSharedState 都为空,则多个 SparkSession 也会创建多个 sharedState。
2.1. 解析和处理 SparkSQL 支持的文件系统协议
$setFsUrlStreamHandlerFactory()$ 方法用于将文件系统协议添加到 Spark SQL 中,例如 Amazon S3、HDFS 或其他自定义文件系统,以便读取数据。
2.2. 配置文件
(conf, hadoopConf)
- resolveWarehousePath
2.3. 初始化 CacheManager
CacheManager 是 Spark SQL 中内存缓存的管理者,在 Spark SQL 中提供对缓存查询结果的支持,并在执行后续查询时自动使用这些缓存结果。数据使用 InMemoryRelation 中存储的字节缓冲区进行缓存。CacheManager 通过 SharedState 在 SparkSessions 之间共享。
2.4. SQLAppStatusStore
SQLAppStatusStore 是一个 SparkSQL 应用程序状态存储库,用于 SparkSQL 内部应用程序的状态管理和检索。它可以用于将 SparkSQL 应用程序在不同步骤之间的状态持久化存储,以便在失败或重新启动时恢复这些状态。
SQLAppStatusStore 提供了一个统一的 API,用于访问和管理应用程序的状态数据,而不管它们是存储在内存、文件系统或数据库中。
1 | val statusStore: SQLAppStatusStore = { |
2.5. StreamingQueryStatusListener
2.6. ExternalCatalogWithListener
$externalCatalog$ 初始化 ExternalCatalogWithListener,ExternalCatalogWithListener 封装了 ExternalCatalog 类,同时提供了事件监听的功能。
通过 externalCatalogClassName 函数获取到要反射的类名,然后通过 $reflect$ 函数反射获取到 externalCatalog
1
2
3
4
5
6
7
8
9val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(conf), conf, hadoopConf
)
private def externalCatalogClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
}
}createDatabase
1
2
3
4
5
6
7
8if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) {
if (!SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase)) {
throw QueryExecutionErrors.defaultDatabaseNotExistsError(SQLConf.get.defaultDatabase)
}
val defaultDbDefinition = CatalogDatabase(SQLConf.get.defaultDatabase, "default database",
CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), Map())
externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true)
}事件监听
1
2
3val wrapped = new ExternalCatalogWithListener(externalCatalog)
wrapped.addListener((event: ExternalCatalogEvent) => sparkContext.listenerBus.post(event))
wrapped
2.7. GlobalTempViewManager
GlobalTempViewManager 全局的临时视图管理器,被 $DataFrame.createGlobalTempView()$ 方法调用,进行跨 session 的视图管理。GlobalTempViewManager 通过 synchronized
关键字保证了线程安全,提供了对全局视图(大小写敏感)的原子操作, 包括创建、更新、删除和重命名等。 内部通过 HashMap 维护视图名和其对应逻辑计划的映射关系
2.8. jarClassLoader
加载用户添加的 jar 包