本页面介绍了如何使用 Spark Spanner 连接器通过 Apache Spark 从 Spanner 读取数据
计算费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- Dataproc
- Spanner
- Cloud Storage
您可使用价格计算器根据您的预计使用情况来估算费用。
准备工作
在运行本教程之前,请确保您知道连接器版本 并获取连接器 URI。
如何指定连接器 JAR 文件 URI
GitHub 中列出了 Spark Spanner 连接器版本 GoogleCloudDataproc/spark-spanner-connector 代码库。
通过替换连接器版本来指定连接器 JAR 文件
以下 URI 字符串中包含信息:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
该连接器适用于 Spark 版本 3.1+
gcloud CLI 示例:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
准备 Spanner 数据库
如果您没有 Spanner 表,可以按照教程创建 Spanner 表。之后,您将获得实例 ID、数据库 ID 和表 Singers
。
创建 Dataproc 集群
使用该连接器的任何 Dataproc 集群都需要 spanner
或 cloud-platform
镜重。对于映像 2.1 或更高版本,Dataproc 集群的默认范围为 cloud-platform
。如果您使用的是较低版本,则可以使用 Google Cloud 控制台、Google Cloud CLI 和 Dataproc API 创建 Dataproc 集群。
控制台
- 在 Google Cloud 控制台中,打开 Dataproc 创建集群页面
- 在“管理安全”标签页中,点击“项目访问权限”部分下的“为此集群启用 cloud-platform 范围”。
- 填写或确认其他集群创建字段,然后 点击“创建”
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
您可以在 clusters.create 请求中指定 GceClusterConfig.serviceAccountScopes。例如:"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
您必须确保向 Dataproc 虚拟机服务账号分配相应的 Spanner 权限。如果您在本教程中使用 Data Boost,请参阅 Data Boost IAM 权限
从 Spanner 读取数据
您可以使用 Scala 和 Python 通过 Spark 数据源 API 将数据从 Spanner 读取到 Spark DataFrame。
Scala
- 检查代码,并将 [projectId]、[instanceId]、[databaseId] 和 [table] 占位符替换为您之前创建的项目 ID、实例 ID、数据库 ID 和表。enableDataBoost 选项可启用 Spanner Data Boost 功能,该功能具有
对 Spanner 主实例的影响几乎为零。
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- 在集群上运行代码
- 使用 SSH 连接到 Dataproc 集群主服务器节点
<ph type="x-smartling-placeholder">
- </ph>
- 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称
- 在集群详情页面上,选择“虚拟机实例”标签页。然后,点击
SSH
(位于集群主服务器节点的名称右侧)
系统会打开一个浏览器窗口并显示主节点上的主目录Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用预装的
vi
、vim
或nano
文本编辑器创建singers.scala
,然后粘贴 Scala 代码列表中的 Scala 代码nano singers.scala
- 启动
spark-shell
REPL。$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- 运行 singers.scala 并使用
:load singers.scala
命令创建 SpannerSingers
表。输出列表将显示来自 Singers 输出的示例。> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- 检查代码,并将 [projectId]、[instanceId]、[databaseId] 和 [table] 占位符替换为您之前创建的项目 ID、实例 ID、数据库 ID 和表。enableDataBoost 选项可启用 Spanner Data Boost 功能,该功能具有
对 Spanner 主实例的影响几乎为零。
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- 在集群上运行代码
- 使用 SSH 连接到 Dataproc 集群主服务器节点
<ph type="x-smartling-placeholder">
- </ph>
- 前往 Dataproc 集群 页面,然后点击集群的名称
- 在集群详情页面上,选择“虚拟机实例”标签页。然后,点击
SSH
(位于集群主服务器节点的名称右侧)
系统会在主节点上的主目录上打开一个浏览器窗口Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用预安装的
vi
、vim
或nano
文本编辑器创建singers.py
,然后粘贴 PySpark 代码列表中的 PySpark 代码nano singers.py
- 使用
spark-submit
运行 singers.py 以创建 SpannerSingers
表。 输出为:spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- 使用 SSH 连接到 Dataproc 集群主服务器节点
<ph type="x-smartling-placeholder">
清理
为进行清理并避免系统因本演示中创建的资源向您的 Google Cloud 账号持续收取费用,请按照以下步骤操作。
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME