将 BigQuery 连接器与 Dataproc Serverless for Spark 搭配使用

spark-bigquery-connectorApache Spark 搭配使用,以从 BigQuery 中读取数据以及将数据写入其中。本教程演示了使用 spark-bigquery-connector 的 PySpark 应用。

将 BigQuery 连接器用于工作负载

请参阅适用于 Spark 运行时版本的 Dataproc 无服务器,确定批量工作负载运行时版本中安装的 BigQuery 连接器版本。如果未列出该连接器,请参阅下一部分,了解如何将该连接器提供给应用。

如何将连接器与 Spark 运行时版本 2.0 搭配使用

Spark 运行时版本 2.0 中未安装 BigQuery 连接器。使用 Spark 运行时版本 2.0 时,您可以通过以下方式之一让连接器可供您的应用使用:

  • 提交 Dataproc Serverless for Spark 批处理工作负载时,请使用 jars 参数指向连接器 jar 文件 以下示例指定了连接器 jar 文件(请参阅 GitHub 上的 GoogleCloudDataproc/spark-bigquery-connector 代码库以获取可用连接器 jar 文件的列表。
    • Google Cloud CLI 示例:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • 将连接器 jar 文件作为依赖项添加到 Spark 应用中(请参阅针对连接器进行编译

计算费用

本教程使用 Google Cloud 的以下收费组件:

  • Dataproc Serverless
  • BigQuery
  • Cloud Storage

请使用价格计算器根据您的预计使用情况来估算费用。Cloud Platform 新用户可能有资格申请免费试用

BigQuery I/O

此示例展示如何将 BigQuery 中的数据读取到 Spark DataFrame 中,以使用标准数据源 API执行字数统计操作。

连接器通过以下方式将字数统计输出写入 BigQuery:

  1. 将数据缓冲到 Cloud Storage 存储桶中的临时文件

  2. 将一项操作中的数据从 Cloud Storage 存储桶复制到 BigQuery 中

  3. 系统在 BigQuery 加载操作完成后删除 Cloud Storage 中的临时文件(临时文件在 Spark 应用终止后也会被删除)。如果删除失败,您需要删除所有不需要的临时 Cloud Storage 文件,这些文件通常放在 gs://your-bucket/.spark-bigquery-jobid-UUID 中。

配置结算功能

默认情况下。与凭据或服务账号关联的项目将计入 API 使用费。要对其他项目计费,请设置以下配置:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

还可以将其添加到读/写操作,如下所示:.option("parentProject", "<BILLED-GCP-PROJECT>")

提交 PySpark 字数统计批处理工作负载

  1. 在本地终端或 Cloud Shell 中使用 bq 命令行工具创建 wordcount_dataset
    bq mk wordcount_dataset
    
  2. 在本地终端或 Cloud Shell 中使用 gsutil 命令行工具创建 Cloud Storage 存储桶。
    gsutil mb gs://your-bucket
    
  3. 检查代码。
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
    
  4. 通过复制 PySpark 代码列表中的 PySpark 代码,在文本编辑器中本地创建 wordcount.py,将 [your-bucket] 占位符替换为您创建的 Cloud Storage 存储桶的名称。
  5. 提交 PySpark 批处理工作负载:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    示例终端输出
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    如需在 Google Cloud 控制台中预览输出表,请打开项目的 BigQuery 页面,选择 wordcount_output 表,然后点击预览

如需深入了解