使用 Spark Spanner 連接器

本頁說明如何建立 Dataproc 叢集,並使用 Spark Spanner 連接器,透過 Apache SparkSpanner 讀取資料

Spanner 連接器可與 Spark 搭配使用,透過 Spanner Java 程式庫從 Spanner 資料庫讀取資料。Spanner 連接器支援將 Spanner 資料表圖表讀取至 Spark DataFramesGraphFrames

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

  • Dataproc
  • Spanner
  • Cloud Storage

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

事前準備

在本教學課程中使用 Spanner 連接器之前,請設定 Dataproc 叢集,以及Spanner 執行個體和資料庫

設定 Dataproc 叢集

建立 Dataproc 叢集,或使用具有下列設定的現有 Dataproc 叢集:

  • VM 服務帳戶權限。叢集VM 服務帳戶必須獲派適當的 Spanner 權限。如果您使用 Data Boost (範例程式碼中已啟用 Data Boost,請參閱「匯出 Spanner 表格」),VM 服務帳戶也必須具備必要的 Data Boost IAM 權限

  • 存取權範圍。建立叢集時,必須啟用 cloud-platform 範圍或適當的 spanner 範圍。使用映像檔版本 2.1 以上版本建立的叢集,預設會啟用 cloud-platform 範圍。

    下列操作說明會示範如何透過 Google Cloud 控制台、gcloud CLI 或 Dataproc API,在叢集建立要求中設定cloud-platform範圍。如需其他叢集建立操作說明,請參閱「建立叢集」。

    Google Cloud 控制台

    1. 在 Google Cloud 控制台中,開啟 Dataproc 的「建立叢集」頁面。
    2. 在「專案存取權」部分的「管理安全性」面板中,按一下「為這個叢集啟用 cloud-platform 範圍」。
    3. 填寫或確認其他叢集建立欄位,然後按一下「建立」

    gcloud CLI

    您可以執行下列 gcloud dataproc clusters create 指令,建立啟用 cloud-platform 範圍的叢集。

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    您可以指定 GceClusterConfig.serviceAccountScopes,做為 clusters.create 要求的一部分。

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

設定含有 Singers 資料庫資料表的 Spanner 執行個體

建立 Spanner 執行個體,並在其中建立含有 Singers 資料表的資料庫。記下 Spanner 執行個體 ID 和資料庫 ID。

搭配使用 Spanner 連接器和 Spark

Spanner 連接器適用於 Spark 版本 3.1+。將工作提交至 Dataproc 叢集時,您可以在 Cloud Storage 連接器 JAR 檔案規格中指定連接器版本

範例:使用 Spanner 連接器透過 gcloud CLI 提交 Spark 工作。

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

更改下列內容:

CONNECTOR_VERSION:Spanner 連接器版本。 從 GitHub GoogleCloudDataproc/spark-spanner-connector 存放區的版本清單中,選擇 Spanner 連接器版本。

讀取 Spanner 資料表

您可以使用 Python 或 Scala,透過 Spark 資料來源 API 將 Spanner 資料表資料讀取至 Spark DataFrame。

PySpark

如要在叢集上執行本節中的範例 PySpark 程式碼,請將工作提交至 Dataproc 服務,或從叢集主要節點上的 spark-submit REPL 執行工作。

Dataproc 工作

  1. 使用本機文字編輯器或 Cloud Shell 中預先安裝的 vivimnano 文字編輯器,建立 singers.py 檔案。
    1. 填入預留位置變數後,將下列程式碼貼到 singers.py 檔案中。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      更改下列內容:

      1. PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:請參閱「使用 Singers 資料庫表單設定 Spanner 執行個體」。
    2. 儲存 singers.py 檔案。
  2. 使用 Google Cloud 控制台、gcloud CLI 或 Dataproc API,將工作提交至 Dataproc 服務。

    範例:使用 Spanner 連接器透過 gcloud CLI 提交工作。

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
          

    更改下列內容:

    1. CLUSTER_NAME:新叢集的名稱。
    2. REGION:可執行工作負載的 Compute Engine區域
    3. CONNECTOR_VERSION:Spanner 連接器版本。 從 GitHub GoogleCloudDataproc/spark-spanner-connector 存放區的版本清單中,選擇 Spanner 連接器版本。

spark-submit 工作

  1. 使用 SSH 連線至 Dataproc 叢集主節點。
    1. 前往 Google Cloud 控制台的 Dataproc「Clusters」(叢集) 頁面,然後按一下叢集名稱。
    2. 在「叢集詳細資料」頁面中,選取「VM 執行個體」分頁標籤。然後按一下叢集主節點名稱右側的 SSH
       Google Cloud 控制台中的 Dataproc 叢集詳細資料頁面螢幕截圖,顯示用於連線至叢集主要節點的 SSH 按鈕。

      瀏覽器視窗會開啟主要節點上的主目錄。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 使用預先安裝的 vivimnano 文字編輯器,在主要節點上建立 singers.py 檔案。
    1. 將下列程式碼貼到 singers.py 檔案中。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      更改下列內容:

      1. PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:請參閱「使用 Singers 資料庫表單設定 Spanner 執行個體」。
    2. 儲存 singers.py 檔案。
  3. 執行 singers.py,使用 spark-submit 建立 Spanner Singers 資料表。
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    更改下列內容:

    1. CONNECTOR_VERSION:Spanner 連接器版本。 從 GitHub GoogleCloudDataproc/spark-spanner-connector 存放區的版本清單中,選擇 Spanner 連接器版本。

    輸出內容會如下所示:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

如要在叢集上執行 Scala 範例程式碼,請完成下列步驟:

  1. 使用 SSH 連線至 Dataproc 叢集主節點。
    1. 前往 Google Cloud 控制台的 Dataproc「Clusters」(叢集) 頁面,然後按一下叢集名稱。
    2. 在「叢集詳細資料」頁面中,選取「VM 執行個體」分頁標籤。然後按一下叢集主節點名稱右側的 SSH Google Cloud 控制台中的 Dataproc 叢集詳細資料頁面。

      瀏覽器視窗會開啟主要節點上的主目錄。

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. 使用預先安裝的 vivimnano 文字編輯器,在主要節點上建立 singers.scala 檔案。
    1. 將下列程式碼貼到 singers.scala 檔案中。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      更改下列內容:

      1. PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
      2. INSTANCE_IDDATABASE_IDTABLE_NAME:請參閱「使用 Singers 資料庫表單設定 Spanner 執行個體」。
    2. 儲存 singers.scala 檔案。
  3. 啟動 spark-shell REPL。
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    更改下列內容:

    CONNECTOR_VERSION:Spanner 連接器版本。 從 GitHub GoogleCloudDataproc/spark-spanner-connector 存放區的版本清單中,選擇 Spanner 連接器版本。

  4. 執行 singers.scala:load singers.scala 指令,建立 Spanner Singers 資料表。輸出清單會顯示 Singers 輸出中的範例。
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

解讀 Spanner 圖表

Spanner 連接器支援將圖形匯出至個別節點和邊緣 DataFrames,以及直接匯出至 GraphFrames

以下範例會將 Spanner 匯出至 GraphFrame。 這項工具會使用 Spanner 連接器 JAR 中包含的 Python SpannerGraphConnector 類別,讀取 Spanner Graph

from pyspark.sql import SparkSession

connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar"

spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example")
         .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
         .config("spark.jars", connector_jar)
         .getOrCreate())
spark.sparkContext.addPyFile(connector_jar)

from spannergraph import SpannerGraphConnector

connector = (SpannerGraphConnector()
             .spark(spark)
             .project("PROJECT_ID")
             .instance("INSTANCE_ID")
             .database("DATABASE_ID")
             .graph("GRAPH_ID"))

g = connector.load_graph()
g.vertices.show()
g.edges.show()

更改下列內容:

  • CONNECTOR_VERSION:Spanner 連接器版本。 從 GitHub GoogleCloudDataproc/spark-spanner-connector 存放區的版本清單中,選擇 Spanner 連接器版本。
  • PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
  • INSTANCE_IDDATABASE_IDTABLE_NAME 插入執行個體、資料庫和圖表 ID。

如要匯出節點和邊緣 DataFrames,而非 GraphFrames,請使用 load_dfs

df_vertices, df_edges, df_id_map = connector.load_dfs()

清除所用資源

如要避免系統持續向您的 Google Cloud 帳戶收費,您可以停止刪除 Dataproc 叢集,並刪除 Spanner 執行個體

後續步驟