您可以将 spark-bigquery-connector 与 Apache Spark 搭配使用,以从 BigQuery 中读取数据以及将数据写入其中。本教程提供了如何在 Spark 应用中使用 spark-bigquery-connector 的示例代码。 如需了解如何创建集群的说明,请参阅 Dataproc 快速入门。
将连接器提供给您的应用
您可以通过以下任一方式将 spark-bigquery-connector 提供给您的应用:
在创建集群时使用 Dataproc 连接器初始化操作,在每个节点的 Spark jars 目录中安装 spark-bigquery-connector。
提交作业时,请提供连接器 URI:
- Google Cloud 控制台:使用 Dataproc 提交作业页面上的 Spark 作业
Jars files
项。 - gcloud CLI:使用
gcloud dataproc jobs submit spark --jars
标志。 - Dataproc API:使用
SparkJob.jarFileUris
字段。
- Google Cloud 控制台:使用 Dataproc 提交作业页面上的 Spark 作业
将 jar 添加到 Scala 或 Java Spark 应用中作为依赖项(请参阅针对连接器进行编译)。
如何指定连接器 jar URI
GitHub GoogleCloudDataproc/spark-bigquery-connector 代码库中列出了 Spark-BigQuery 连接器版本。
通过在以下 URI 字符串中替换 Scala 和连接器版本信息,指定连接器 JAR:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
将 Scala
2.12
与 Dataproc 映像版本1.5+
搭配使用gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud CLI 示例:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
将 Scala
2.11
与 Dataproc 映像版本1.4
及更低版本搭配使用:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud CLI 示例:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
计算费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- Dataproc
- BigQuery
- Cloud Storage
您可使用价格计算器根据您的预计使用情况来估算费用。
从 BigQuery 读取和写入数据
此示例展示如何将 BigQuery 中的数据读取到 Spark DataFrame 中,以使用标准数据源 API执行字数统计操作。
连接器通过先将所有数据缓冲到 Cloud Storage 临时表中,将数据写入 BigQuery。然后,它只需一次操作即可将所有数据复制到 BigQuery 中。在 BigQuery 加载操作成功并且当 Spark 应用终止时再次成功之后,连接器便会尝试删除临时文件。如果作业失败,请移除所有残留的临时 Cloud Storage 文件。通常,临时 BigQuery 文件位于 gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
中。
配置结算功能
默认情况下,与凭据或服务账号关联的项目将计入 API 使用费。要对其他项目计费,请设置以下配置:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
。
还可以将其添加到读/写操作,如下所示:.option("parentProject", "<BILLED-GCP-PROJECT>")
。
运行代码
在运行此示例之前,请先创建名为“wordcount_dataset”的数据集,或将代码中的输出数据集更改为 Google Cloud 项目中的现有 BigQuery 数据集。
使用 bq 命令创建 wordcount_dataset
:
bq mk wordcount_dataset
使用 Google Cloud CLI 命令创建 Cloud Storage 存储桶,该存储分区将用于导出到 BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- 检查代码并将 [bucket] 占位符替换为您之前创建的 Cloud Storage 存储桶。
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- 在集群上运行代码
- 使用 SSH 连接到 Dataproc 集群主服务器实例节点
- 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称
- 在 > 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的
SSH
此时会打开一个浏览器窗口并显示主节点上的主目录Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用预装的
vi
、vim
或nano
文本编辑器创建wordcount.scala
,然后粘贴 Scala 代码列表中的 Scala 代码nano wordcount.scala
- 启动
spark-shell
REPL。$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- 运行 wordcount.scala 并使用
:load wordcount.scala
命令创建 BigQuerywordcount_output
表。输出列表将显示来自 wordcount 输出的 20 行内容。:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
如需预览输出表,请打开BigQuery
页面,选择wordcount_output
表,然后点击预览。
- 使用 SSH 连接到 Dataproc 集群主服务器实例节点
PySpark
- 检查代码并将 [bucket] 占位符替换为您之前创建的 Cloud Storage 存储桶。
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- 在集群上运行代码
- 使用 SSH 连接到 Dataproc 集群主服务器实例节点
- 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称
- 在集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的
SSH
此时会打开一个浏览器窗口并显示主节点上的主目录Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- 使用预安装的
vi
、vim
或nano
文本编辑器创建wordcount.py
,然后粘贴 PySpark 代码列表中的 PySpark 代码nano wordcount.py
- 使用
spark-submit
运行 wordcount 以创建 BigQuerywordcount_output
表。输出列表将显示来自 wordcount 输出的 20 行内容。spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
如需预览输出表,请打开BigQuery
页面,选择wordcount_output
表,然后点击预览。
- 使用 SSH 连接到 Dataproc 集群主服务器实例节点