本頁說明如何建立 Dataproc 叢集,並使用 Spark Spanner 連接器,透過 Apache Spark 從 Spanner 讀取資料
Spanner 連接器可與 Spark 搭配使用,透過 Spanner Java 程式庫從 Spanner 資料庫讀取資料。Spanner 連接器支援將 Spanner 資料表和圖表讀取至 Spark DataFrames 和 GraphFrames。
費用
在本文件中,您會使用 Google Cloud的下列計費元件:
- Dataproc
- Spanner
- Cloud Storage
如要根據預測用量估算費用,請使用 Pricing Calculator。
事前準備
在本教學課程中使用 Spanner 連接器之前,請設定 Dataproc 叢集,以及Spanner 執行個體和資料庫。
設定 Dataproc 叢集
建立 Dataproc 叢集,或使用具有下列設定的現有 Dataproc 叢集:
VM 服務帳戶權限。叢集VM 服務帳戶必須獲派適當的 Spanner 權限。如果您使用 Data Boost (範例程式碼中已啟用 Data Boost,請參閱「匯出 Spanner 表格」),VM 服務帳戶也必須具備必要的 Data Boost IAM 權限。
存取權範圍。建立叢集時,必須啟用
cloud-platform
範圍或適當的spanner
範圍。使用映像檔版本 2.1 以上版本建立的叢集,預設會啟用cloud-platform
範圍。下列操作說明會示範如何透過 Google Cloud 控制台、gcloud CLI 或 Dataproc API,在叢集建立要求中設定
cloud-platform
範圍。如需其他叢集建立操作說明,請參閱「建立叢集」。Google Cloud 控制台
- 在 Google Cloud 控制台中,開啟 Dataproc 的「建立叢集」頁面。
- 在「專案存取權」部分的「管理安全性」面板中,按一下「為這個叢集啟用 cloud-platform 範圍」。
- 填寫或確認其他叢集建立欄位,然後按一下「建立」。
gcloud CLI
您可以執行下列
gcloud dataproc clusters create
指令,建立啟用cloud-platform
範圍的叢集。gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
您可以指定 GceClusterConfig.serviceAccountScopes,做為 clusters.create 要求的一部分。
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
設定含有 Singers 資料庫資料表的 Spanner 執行個體
建立 Spanner 執行個體,並在其中建立含有 Singers
資料表的資料庫。記下 Spanner 執行個體 ID 和資料庫 ID。
搭配使用 Spanner 連接器和 Spark
Spanner 連接器適用於 Spark 版本 3.1+
。將工作提交至 Dataproc 叢集時,您可以在 Cloud Storage 連接器 JAR 檔案規格中指定連接器版本。
範例:使用 Spanner 連接器透過 gcloud CLI 提交 Spark 工作。
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
更改下列內容:
CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub GoogleCloudDataproc/spark-spanner-connector
存放區的版本清單中,選擇 Spanner 連接器版本。
讀取 Spanner 資料表
您可以使用 Python 或 Scala,透過 Spark 資料來源 API 將 Spanner 資料表資料讀取至 Spark DataFrame。
PySpark
如要在叢集上執行本節中的範例 PySpark 程式碼,請將工作提交至 Dataproc 服務,或從叢集主要節點上的 spark-submit
REPL 執行工作。
Dataproc 工作
- 使用本機文字編輯器或 Cloud Shell 中預先安裝的
vi
、vim
或nano
文字編輯器,建立singers.py
檔案。 - 填入預留位置變數後,將下列程式碼貼到
singers.py
檔案中。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱「使用
Singers
資料庫表單設定 Spanner 執行個體」。
- 儲存
singers.py
檔案。 - 使用 Google Cloud 控制台、gcloud CLI 或 Dataproc API,將工作提交至 Dataproc 服務。
範例:使用 Spanner 連接器透過 gcloud CLI 提交工作。
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
更改下列內容:
- CLUSTER_NAME:新叢集的名稱。
- REGION:可執行工作負載的 Compute Engine區域。
- CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub
GoogleCloudDataproc/spark-spanner-connector
存放區的版本清單中,選擇 Spanner 連接器版本。
spark-submit 工作
- 使用 SSH 連線至 Dataproc 叢集主節點。
- 前往 Google Cloud 控制台的 Dataproc「Clusters」(叢集) 頁面,然後按一下叢集名稱。
- 在「叢集詳細資料」頁面中,選取「VM 執行個體」分頁標籤。然後按一下叢集主節點名稱右側的
SSH
。瀏覽器視窗會開啟主要節點上的主目錄。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用預先安裝的
vi
、vim
或nano
文字編輯器,在主要節點上建立singers.py
檔案。- 將下列程式碼貼到
singers.py
檔案中。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱「使用
Singers
資料庫表單設定 Spanner 執行個體」。
- 儲存
singers.py
檔案。
- 將下列程式碼貼到
- 執行
singers.py
,使用spark-submit
建立 SpannerSingers
資料表。spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
更改下列內容:
- CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub
GoogleCloudDataproc/spark-spanner-connector
存放區的版本清單中,選擇 Spanner 連接器版本。
輸出內容會如下所示:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub
Scala
如要在叢集上執行 Scala 範例程式碼,請完成下列步驟:
- 使用 SSH 連線至 Dataproc 叢集主節點。
- 前往 Google Cloud 控制台的 Dataproc「Clusters」(叢集) 頁面,然後按一下叢集名稱。
- 在「叢集詳細資料」頁面中,選取「VM 執行個體」分頁標籤。然後按一下叢集主節點名稱右側的
SSH
。瀏覽器視窗會開啟主要節點上的主目錄。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用預先安裝的
vi
、vim
或nano
文字編輯器,在主要節點上建立singers.scala
檔案。- 將下列程式碼貼到
singers.scala
檔案中。請注意,系統已啟用 Spanner Data Boost 功能,對主要 Spanner 執行個體的影響幾乎為零。object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:請參閱「使用
Singers
資料庫表單設定 Spanner 執行個體」。
- 儲存
singers.scala
檔案。
- 將下列程式碼貼到
- 啟動
spark-shell
REPL。$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
更改下列內容:
CONNECTOR_VERSION:Spanner 連接器版本。 從 GitHub
GoogleCloudDataproc/spark-spanner-connector
存放區的版本清單中,選擇 Spanner 連接器版本。 - 執行
singers.scala
和:load singers.scala
指令,建立 SpannerSingers
資料表。輸出清單會顯示 Singers 輸出中的範例。> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
解讀 Spanner 圖表
Spanner 連接器支援將圖形匯出至個別節點和邊緣 DataFrames,以及直接匯出至 GraphFrames
。
以下範例會將 Spanner 匯出至 GraphFrame
。
這項工具會使用 Spanner 連接器 JAR 中包含的 Python SpannerGraphConnector
類別,讀取 Spanner Graph。
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
更改下列內容:
- CONNECTOR_VERSION:Spanner 連接器版本。
從 GitHub
GoogleCloudDataproc/spark-spanner-connector
存放區的版本清單中,選擇 Spanner 連接器版本。 - PROJECT_ID:您的 Google Cloud 專案 ID。 專案 ID 會列在 Google Cloud 控制台資訊主頁的「專案資訊」部分。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME 插入執行個體、資料庫和圖表 ID。
如要匯出節點和邊緣 DataFrames
,而非 GraphFrames,請使用 load_dfs
:
df_vertices, df_edges, df_id_map = connector.load_dfs()
清除所用資源
如要避免系統持續向您的 Google Cloud 帳戶收費,您可以停止或刪除 Dataproc 叢集,並刪除 Spanner 執行個體。
後續步驟
- 請參閱
pyspark.sql.DataFrame
範例。 - 如需 Spark DataFrame 語言支援,請參閱下列文章:
- 請參閱 GitHub 上的 Spark Spanner 連接器存放區。
- 請參閱 Spark 工作調整提示。