将 BigQuery 连接器与 Spark 搭配使用

您可以将 spark-bigquery-connectorApache Spark 搭配使用,以从 BigQuery 中读取数据以及将数据写入其中。本教程提供了如何在 Spark 应用中使用 spark-bigquery-connector 的示例代码。 如需了解如何创建集群的说明,请参阅 Dataproc 快速入门

将连接器添加到您的应用

在运行时 spark-bigquery-connector 必须可用于您的应用。 这可通过以下某种方式来实现:

  • 在 Spark jars 目录中安装连接器。
  • 使用 --jars 参数(该参数可与 Dataproc API 或 spark-submit 搭配使用)在运行时添加连接器。
    • 如果您使用的是 Dataproc 映像 1.5,请添加以下参数:
      --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
    • 如果您使用的是 Dataproc 映像 1.4 或更低版本,请添加以下参数:
      --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
  • 将 jar 添加到 Scala 或 Java Spark 应用中作为依赖项(请参阅针对连接器进行编译

如果连接器在运行时不可用,则会引发 ClassNotFoundException

计算费用

本教程使用 Google Cloud 的计费组件,包括:

  • Dataproc
  • BigQuery
  • Cloud Storage

请使用价格计算器根据您的预计使用情况来估算费用。Cloud Platform 新用户可能有资格申请免费试用

从 BigQuery 读取和写入数据

此示例展示如何将 BigQuery 中的数据读取到 Spark DataFrame 中,以使用标准数据源 API执行字数统计操作。

连接器通过先将所有数据缓冲到 Cloud Storage 临时表中,然后通过一次操作将所有数据复制到 BigQuery,从而将数据写入 BigQuery。在 BigQuery 加载操作成功并且当 Spark 应用终止时再次成功之后,连接器便会尝试删除临时文件。如果作业失败,您可能需要手动移除所有残留的临时 Cloud Storage 文件。通常情况下,您可以在 gs://[bucket]/.spark-bigquery-[jobid]-[UUID] 中找到临时的 BigQuery 导出内容。

配置结算功能

默认情况下。与凭据或服务帐号关联的项目将计入 API 使用费。要对其他项目计费,请设置以下配置:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

还可以将其添加到读/写操作,如下所示:.option("parentProject", "<BILLED-GCP-PROJECT>")

运行代码

在运行此示例之前,请先创建名为“wordcount_dataset”的数据集,或将代码中的输出数据集更改为 Google Cloud 项目中的现有 BigQuery 数据集。

使用 bq 命令创建 wordcount_dataset

bq mk wordcount_dataset

使用 gsutil 命令创建 Cloud Storage 存储分区,该存储分区将用于导出到 BigQuery:

gsutil mb gs://[bucket]

Scala

  1. 检查代码并将 [bucket] 占位符替换为您之前创建的 Cloud Storage 存储分区。
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF = spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache()
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save()
    
    
  2. 在集群上运行代码
    1. 通过 SSH 连接到 Dataproc 集群的主实例节点
      1. 在 Cloud Console 中转到项目的 Dataproc 集群页面,然后点击集群的名称
      2. 在集群详情页面上,选择“虚拟机实例”标签页,然后点击显示在集群主节点名称右侧的 SSH 选项

        此时会打开一个浏览器窗口并显示主节点上的主目录
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 使用预装的 vivimnano 文本编辑器创建 wordcount.scala,然后粘贴 Scala 代码列表中的 Scala 代码
      nano wordcount.scala
        
    3. 启动 spark-shell REPL。
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. 运行 wordcount.scala 并使用 :load wordcount.scala 命令创建 BigQuery wordcount_output 表。输出列表将显示来自 wordcount 输出的 20 行内容。
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      要预览输出表,请打开项目的 BigQuery 页面,选择 wordcount_output 表,然后点击预览

PySpark

  1. 检查代码并将 [bucket] 占位符替换为您之前创建的 Cloud Storage 存储分区。
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. 在集群上运行代码
    1. 通过 SSH 连接到 Dataproc 集群的主实例节点
      1. 在 Cloud Console 中转到项目的 Dataproc 集群页面,然后点击集群的名称
      2. 在集群详情页面上,选择“虚拟机实例”标签页,然后点击显示在集群主节点名称右侧的 SSH 选项

        此时会打开一个浏览器窗口并显示主节点上的主目录
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 使用预安装的 vivimnano 文本编辑器创建 wordcount.py,然后粘贴 PySpark 代码列表中的 PySpark 代码
      nano wordcount.py
      
    3. 使用 spark-submit 运行 wordcount 以创建 BigQuery wordcount_output 表。输出列表将显示来自 wordcount 输出的 20 行内容。
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      要预览输出表,请打开项目的 BigQuery 页面,选择 wordcount_output 表,然后点击预览

了解详情