本页介绍了如何创建一个 Dataproc 集群,该集群使用 Apache Spark 通过 Spark Spanner 连接器从 Spanner 读取数据
计算费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- Dataproc
- Spanner
- Cloud Storage
您可使用价格计算器根据您的预计使用情况来估算费用。
准备工作
在本教程中使用 Spanner 连接器之前,请先设置 Dataproc 集群以及 Spanner 实例和数据库。
设置 Dataproc 集群
创建 Dataproc 集群,或使用具有以下设置的现有 Dataproc 集群:
虚拟机服务账号权限。必须为集群的虚拟机服务账号分配适当的 Spanner 权限。如果您使用 Data Boost(从 Spanner 读取数据中的示例代码中启用了 Data Boost),虚拟机服务账号还必须具有所需的 Data Boost IAM 权限。
访问权限范围。必须启用
cloud-platform
范围或适当的spanner
范围,才能创建集群。对于使用映像版本 2.1 或更高版本创建的集群,cloud-platform
作用域默认处于启用状态。以下说明介绍了如何在使用 Google Cloud 控制台、gcloud CLI 或 Dataproc API 创建集群的请求中设置
cloud-platform
范围。如需有关创建集群的其他说明,请参阅创建集群。Google Cloud 控制台
- 在 Google Cloud 控制台中,打开 Dataproc 创建集群页面。
- 在管理安全性面板的项目访问权限部分中,点击“为此集群启用 cloud-platform 范围”。
- 填写或确认其他集群创建字段,然后点击创建。
gcloud CLI
您可以运行以下
gcloud dataproc clusters create
命令,创建启用了cloud-platform
范围的集群。gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
您可以在 clusters.create 请求中指定 GceClusterConfig.serviceAccountScopes。
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
设置包含 Singers
数据库表的 Spanner 实例
创建 Spanner 实例,其中包含一个包含 Singers
表的数据库。记下 Spanner 实例 ID 和数据库 ID。
将 Spanner 连接器与 Spark 搭配使用
Spanner 连接器适用于 Spark 版本 3.1+
。在向 Dataproc 集群提交作业时,您可以在 Cloud Storage 连接器 JAR 文件规范中指定连接器版本。
示例:使用 Spanner 连接器通过 gcloud CLI 提交 Spark 作业。
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
替换以下内容:
CONNECTOR_VERSION:Spanner 连接器版本。
从 GitHub GoogleCloudDataproc/spark-spanner-connector
代码库中的版本列表中选择 Spanner 连接器版本。
从 Spanner 读取数据
您可以使用 Python 或 Scala 通过 Spark 数据源 API 将数据从 Spanner 读取到 Spark DataFrame。
PySpark
您可以通过将作业提交到 Dataproc 服务或通过在集群主节点上的 spark-submit
REPL 中运行作业,在集群上运行本部分中的示例 PySpark 代码。
Dataproc 作业
- 使用本地文本编辑器在本地创建
singers.py
文件,或在 Cloud Shell 中使用预安装的vi
、vim
或nano
文本编辑器创建singers.py
文件。 - 将以下代码粘贴到
singers.py
文件中。请注意,Spanner Data Boost 功能已启用,对主 Spanner 实例的影响几乎为零。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
替换以下内容:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:请参阅使用
Singers
数据库表设置 Spanner 实例。
- 保存
singers.py
文件。 - 使用 Google Cloud 控制台、gcloud CLI 或 Dataproc API 将作业提交到 Dataproc 服务。
示例:使用 Spanner 连接器提交 gcloud CLI 作业。
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
替换以下内容:
- CLUSTER_NAME:新集群的名称。
- REGION:用于运行工作负载的可用 Compute Engine 区域。
- CONNECTOR_VERSION:Spanner 连接器版本。
从 GitHub
GoogleCloudDataproc/spark-spanner-connector
代码库中的版本列表中选择 Spanner 连接器版本。
spark-submit 作业
- 使用 SSH 连接到 Dataproc 集群主服务器节点。
- 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称。
- 在集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的
SSH
。此时会打开一个浏览器窗口并显示主节点上的主目录。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用预安装的
vi
、vim
或nano
文本编辑器在主节点上创建singers.py
文件。- 将以下代码粘贴到
singers.py
文件中。请注意,Spanner Data Boost 功能已启用,对主 Spanner 实例的影响几乎为零。#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
替换以下内容:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:请参阅使用
Singers
数据库表设置 Spanner 实例。
- 保存
singers.py
文件。
- 将以下代码粘贴到
- 使用
spark-submit
运行singers.py
以创建 SpannerSingers
表。spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
替换以下内容:
- CONNECTOR_VERSION:Spanner 连接器版本。
从 GitHub
GoogleCloudDataproc/spark-spanner-connector
代码库中的版本列表中选择 Spanner 连接器版本。
输出为:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION:Spanner 连接器版本。
从 GitHub
Scala
如需在集群上运行 Scala 示例代码,请完成以下步骤:
- 使用 SSH 连接到 Dataproc 集群主服务器节点。
- 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称。
- 在集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的
SSH
。此时会打开一个浏览器窗口并显示主节点上的主目录。
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用预安装的
vi
、vim
或nano
文本编辑器在主节点上创建singers.scala
文件。- 将以下代码粘贴到
singers.scala
文件中。请注意,Spanner Data Boost 功能已启用,对主 Spanner 实例的影响几乎为零。object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
替换以下内容:
- PROJECT_ID:您的 Google Cloud 项目 ID。 项目 ID 列在 Google Cloud 控制台信息中心的项目信息部分中。
- INSTANCE_ID、DATABASE_ID 和 TABLE_NAME:请参阅使用
Singers
数据库表设置 Spanner 实例。
- 保存
singers.scala
文件。
- 将以下代码粘贴到
- 启动
spark-shell
REPL。$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
替换以下内容:
CONNECTOR_VERSION:Spanner 连接器版本。 从 GitHub
GoogleCloudDataproc/spark-spanner-connector
代码库中的版本列表中选择 Spanner 连接器版本。 - 使用
:load singers.scala
命令运行singers.scala
,以创建 SpannerSingers
表。输出列表将显示来自 Singers 输出的示例。> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
清理
为避免系统向您的 Google Cloud 账号持续收取费用,您可以停止或删除 Dataproc 集群,并删除 Spanner 实例。