一、概述

Spark 提供了 CatalogManager,其内部通过一个 Map 类型的内存数据结构维护注册的 Catalog 实例。

二、实现

2.1. 结构

2.1.1. 属性

  • catalogs

    保存 catalog 名字和 Class 的隐射关系

    1
    private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]

2.1.2. 方法

  1. catalog()

    用来查找特定名字的 Catalog,返回 CatalogPlugin 接口。

    1
    2
    3
    4
    5
    6
    7
    def catalog(name: String): CatalogPlugin = synchronized {
    if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
    v2SessionCatalog
    } else {
    catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
    }
    }

    CatalogManager 维护了所有 Catalog 实例的键值对信息,能够根据 catalog 名称返回对应的 Catalog 实例,其中有一个固定的名字叫 spark_catalog,用于当前默认的 Catalog 实例实现,就是 V2SessionCatalog,它代理了普通的 SessionCatalog。

  2. load()

三、总结

Spark 通过 CatalogManager 可以同时管理内部连接多个 Catalog,通过 spark.sql.catalog.${name} 可以注册多个 Catalog,Spark 默认的 Catalog 由 spark.sql.catalog.spark_catalog 参数指定

通常的做法是,自定义 Catalog 类继承 DelegatingCatalogExtension 实现,然后通过 spark.sql.catalog.spark_catalog 参数来指定自定义 Catalog 类。

详细看一下 HiveExternalCatalog, v2SessionCatalog, spark_catalog 等对象的实例化和管理流程:

  1. SparkSession 启用 HiveSupport, $SparkSession.enableHiveSupport(true)$ ,在该方法内会设置参数 CATALOG_IMPLEMENTATION = hive, Spark SQL 的 Catalog 默认支持 hive 和 in-memory 两种,如果没有指定,默认为 in-memory。

    session.sharedState.externalCatalog 是 SparkSession 实际负责和外部系统交互的 Catalog, 根据上面设置的参数,分别会实例化出 HiveExternalCatalog 和 InMemoryCatalog 两个实例。

  2. 在 BaseSessionStateBuilder/HiveSessionStateBuilder 中会使用上面的 externalCatalog 创建 Catalog 对象,再根据 Catalog 创建 v2SessionCatalog 对象

  3. 根据 catalog 和 v2SessionCatalog 创建 CatalogManager 实例。CatalogManager 通过 catalogs 对象来管理多个 catalog。

    CatalogManager 的 defaultSessionCatalog 属性就是上面的 v2SessionCatalog 对象。

  4. $CatalogManager.catalog()$ 方法通过 catalog 的 name 返回 CatalogPlugin 实例,如果没有该实例,则通过 $Catalogs.load()$ 方法进行实例化。

    $Catalogs.load()$ 方法加载 conf 中配置的 spark.sql.catalog.${name} 类,并实例化/初始化 CatalogPlugin 对象

  5. 如果 spark.sql.catalog.${name} 参数为空(默认为空)时,返回 CatalogManager 中的 defaultSessionCatalog 属性.

  6. 如果 spark.sql.catalog.spark_catalog 参数已经配置,对上面 $Catalogs.load()$ 出来的实例进行判断,如果发现上面加载的是 CatalogExtension 子类,自动调用其 setDelegateCatalog() 方法,将 CatalogManager 中 defaultSessionCatalog 设置为其内部代理对象。