将 spark-bigquery-connector 与 Apache Spark 搭配使用,以从 BigQuery 中读取数据以及将数据写入其中。本教程演示了使用 spark-bigquery-connector
的 PySpark 应用。
将 BigQuery 连接器与工作负载搭配使用
请参阅 Dataproc Serverless for Spark 运行时版本,确定批处理工作负载运行时版本中安装的 BigQuery 连接器版本。如果连接器未列出 请参阅下一部分,了解有关如何将该连接器 应用。
如何将该连接器与 Spark 运行时版本 2.0 搭配使用
Spark 运行时版本 2.0 中未安装 BigQuery 连接器。使用 Spark 运行时版本 2.0,您可以将连接器提供给应用 :
- 在以下情况下,使用
jars
参数指向连接器 jar 文件: 提交 Dataproc Serverless for Spark 批量工作负载 以下示例指定了连接器 jar 文件(请参阅 GoogleCloudDataproc/spark-bigquery-connector 获取可用连接器 jar 文件的列表)。- Google Cloud CLI 示例:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Google Cloud CLI 示例:
- 将连接器 jar 文件作为依赖项添加到 Spark 应用中(请参阅针对连接器进行编译)
计算费用
本教程使用 Google Cloud 的以下收费组件:
- Dataproc Serverless
- BigQuery
- Cloud Storage
请使用价格计算器根据您的预计使用情况来估算费用。Cloud Platform 新用户可能有资格申请免费试用。
BigQuery I/O
此示例展示如何将 BigQuery 中的数据读取到 Spark DataFrame 中,以使用标准数据源 API执行字数统计操作。
连接器通过以下方式将字数统计输出写入 BigQuery:
将数据缓冲到 Cloud Storage 存储桶中的临时文件
将一项操作中的数据从 Cloud Storage 存储桶复制到 BigQuery 中
系统在 BigQuery 加载操作完成后删除 Cloud Storage 中的临时文件(临时文件在 Spark 应用终止后也会被删除)。如果删除失败,您需要删除所有不需要的临时 Cloud Storage 文件,这些文件通常放在
gs://your-bucket/.spark-bigquery-jobid-UUID
中。
配置结算功能
默认情况下。与凭据或服务账号关联的项目将计入 API 使用费。要对其他项目计费,请设置以下配置:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
。
还可以将其添加到读/写操作,如下所示:.option("parentProject", "<BILLED-GCP-PROJECT>")
。
提交 PySpark 字数统计批处理工作负载
- 在本地终端或 Cloud Shell 中使用 bq 命令行工具创建
wordcount_dataset
。bq mk wordcount_dataset
- 在本地终端或 Cloud Shell 中使用 Google Cloud CLI 创建 Cloud Storage 存储桶。
gcloud storage buckets create gs://your-bucket
- 检查代码。
#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[your-bucket-name]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- 通过复制 PySpark 代码列表中的 PySpark 代码,在文本编辑器中本地创建
wordcount.py
,将 [your-bucket] 占位符替换为您创建的 Cloud Storage 存储桶的名称。 - 提交 PySpark 批处理工作负载:
终端输出示例:gcloud dataproc batches submit pyspark wordcount.py \ --region=region \ --deps-bucket=your-bucket
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
如需在 Google Cloud 控制台中预览输出表,请打开您项目的 BigQuery 页面中,选择wordcount_output
表格,然后点击 预览。