将 Spanner 连接器与 Spark 搭配使用

本页介绍了如何创建一个 Dataproc 集群,该集群使用 Apache Spark 通过 Spark Spanner 连接器Spanner 读取数据

计算费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Dataproc
  • Spanner
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

在本教程中使用 Spanner 连接器之前,请先设置 Dataproc 集群以及 Spanner 实例和数据库。

设置 Dataproc 集群

创建 Dataproc 集群,或使用具有以下设置的现有 Dataproc 集群:

设置包含 Singers 数据库表的 Spanner 实例

创建 Spanner 实例,其中包含一个包含 Singers 表的数据库。记下 Spanner 实例 ID 和数据库 ID。

将 Spanner 连接器与 Spark 搭配使用

Spanner 连接器适用于 Spark 版本 3.1+。在向 Dataproc 集群提交作业时,您可以在 Cloud Storage 连接器 JAR 文件规范中指定连接器版本

示例:使用 Spanner 连接器通过 gcloud CLI 提交 Spark 作业。

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

替换以下内容:

CONNECTOR_VERSION:Spanner 连接器版本。 从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。

从 Spanner 读取数据

您可以使用 Python 或 Scala 通过 Spark 数据源 API 将数据从 Spanner 读取到 Spark DataFrame。

PySpark

您可以通过将作业提交到 Dataproc 服务或通过在集群主节点上的 spark-submit REPL 中运行作业,在集群上运行本部分中的示例 PySpark 代码。

Dataproc 作业

  1. 使用本地文本编辑器在本地创建 singers.py 文件,或在 Cloud Shell 中使用预安装的 vivimnano 文本编辑器创建 singers.py 文件。
    1. 将以下代码粘贴到 singers.py 文件中。请注意,Spanner Data Boost 功能已启用,对主 Spanner 实例的影响几乎为零。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      替换以下内容:

      1. PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:请参阅使用 Singers 数据库表设置 Spanner 实例
    2. 保存 singers.py 文件。
  2. 使用 Google Cloud 控制台、gcloud CLI 或 Dataproc API 将作业提交到 Dataproc 服务。

    示例:使用 Spanner 连接器提交 gcloud CLI 作业。

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    替换以下内容:

    1. CLUSTER_NAME:新集群的名称。
    2. REGION:用于运行工作负载的可用 Compute Engine 区域
    3. CONNECTOR_VERSION:Spanner 连接器版本。 从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。

spark-submit 作业

  1. 使用 SSH 连接到 Dataproc 集群主服务器节点。
    1. 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称。
    2. 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的 SSH
      Cloud 控制台中的 Dataproc 集群详情页面。

      此时会打开一个浏览器窗口并显示主节点上的主目录。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 使用预安装的 vivimnano 文本编辑器在主节点上创建 singers.py 文件。
    1. 将以下代码粘贴到 singers.py 文件中。请注意,Spanner Data Boost 功能已启用,对主 Spanner 实例的影响几乎为零。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      替换以下内容:

      1. PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:请参阅使用 Singers 数据库表设置 Spanner 实例
    2. 保存 singers.py 文件。
  3. 使用 spark-submit 运行 singers.py 以创建 Spanner Singers 表。
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    替换以下内容:

    1. CONNECTOR_VERSION:Spanner 连接器版本。 从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。

    输出为:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

如需在集群上运行 Scala 示例代码,请完成以下步骤:

  1. 使用 SSH 连接到 Dataproc 集群主服务器节点。
    1. 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称。
    2. 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的 SSH
      Cloud 控制台中的 Dataproc 集群详情页面。

      此时会打开一个浏览器窗口并显示主节点上的主目录。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 使用预安装的 vivimnano 文本编辑器在主节点上创建 singers.scala 文件。
    1. 将以下代码粘贴到 singers.scala 文件中。请注意,Spanner Data Boost 功能已启用,对主 Spanner 实例的影响几乎为零。
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      替换以下内容:

      1. PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心项目信息部分中。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:请参阅使用 Singers 数据库表设置 Spanner 实例
    2. 保存 singers.scala 文件。
  3. 启动 spark-shell REPL。
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    替换以下内容:

    CONNECTOR_VERSION:Spanner 连接器版本。 从 GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中的版本列表中选择 Spanner 连接器版本。

  4. 使用 :load singers.scala 命令运行 singers.scala,以创建 Spanner Singers 表。输出列表将显示来自 Singers 输出的示例。
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

清理

为避免系统向您的 Google Cloud 账号持续收取费用,您可以停止删除 Dataproc 集群,并删除 Spanner 实例

了解详情