将 BigQuery 连接器与 Spark 搭配使用

您可以将 spark-bigquery-connectorApache Spark 搭配使用,以从 BigQuery 中读取数据以及将数据写入其中。本教程提供了如何在 Spark 应用中使用 spark-bigquery-connector 的示例代码。 如需了解如何创建集群,请参阅 Dataproc 快速入门

使连接器可供您的应用使用

您可以通过以下任一方式将 spark-bigquery-connector 提供给您的应用:

  1. 在创建集群时,使用 Dataproc 连接器初始化操作在每个节点的 Spark jars 目录中安装 spark-bigquery-connector。

  2. 提交作业时提供连接器 URI:

    1. Google Cloud 控制台:使用 Dataproc 提交作业页面上的 Spark 作业 Jars files 项。
    2. gcloud CLI:使用 gcloud dataproc jobs submit spark --jars 标志
    3. Dataproc API:使用 SparkJob.jarFileUris 字段
  3. 将该 jar 作为依赖项添加到 Scala 或 Java Spark 应用中(请参阅针对连接器编译)。

如何指定连接器 jar URI

GitHub GoogleCloudDataproc/spark-bigquery-connector 代码库中列出了 Spark-BigQuery 连接器版本。

替换以下 URI 字符串中的 Scala 和连接器版本信息,以指定连接器 jar:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • 将 Scala 2.12 与 Dataproc 映像版本 1.5+ 搭配使用

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    gcloud CLI 示例:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • 将 Scala 2.11 与 Dataproc 映像版本 1.4 及更低版本搭配使用:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    gcloud CLI 示例:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

计算费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Dataproc
  • BigQuery
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

从 BigQuery 读取和写入数据

此示例展示如何将 BigQuery 中的数据读取到 Spark DataFrame 中,以使用标准数据源 API执行字数统计操作。

连接器首先将所有数据缓冲到一个 Cloud Storage 临时表中,从而将数据写入 BigQuery。然后,它会通过一次操作将所有数据从 BigQuery 复制到 BigQuery。在 BigQuery 加载操作成功并且当 Spark 应用终止时再次成功之后,连接器便会尝试删除临时文件。如果作业失败,请移除任何剩余的临时 Cloud Storage 文件。临时 BigQuery 文件通常位于 gs://[bucket]/.spark-bigquery-[jobid]-[UUID]

配置结算功能

默认情况下,系统会向与凭据或服务帐号关联的项目收取 API 使用费。要对其他项目计费,请设置以下配置:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

还可以将其添加到读/写操作,如下所示:.option("parentProject", "<BILLED-GCP-PROJECT>")

运行代码

在运行此示例之前,请先创建名为“wordcount_dataset”的数据集,或将代码中的输出数据集更改为 Google Cloud 项目中的现有 BigQuery 数据集。

使用 bq 命令创建 wordcount_dataset

bq mk wordcount_dataset

使用 gsutil 命令创建 Cloud Storage 存储桶,该存储分区将用于导出到 BigQuery:

gsutil mb gs://[bucket]

Scala

  1. 检查代码并将 [bucket] 占位符替换为您之前创建的 Cloud Storage 存储桶。
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
    
    
  2. 在集群上运行代码
    1. 使用 SSH 连接到 Dataproc 集群主服务器节点
      1. 转到 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称
        Cloud 控制台中的 Dataproc 集群页面。
      2. >集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的 SSH
        Cloud 控制台中的 Dataproc 集群详情页面。

        系统会在主节点上的主目录中打开一个浏览器窗口
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 使用预装的 vivimnano 文本编辑器创建 wordcount.scala,然后粘贴 Scala 代码列表中的 Scala 代码
      nano wordcount.scala
        
    3. 启动 spark-shell REPL。
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. 运行 wordcount.scala 并使用 :load wordcount.scala 命令创建 BigQuery wordcount_output 表。输出列表将显示来自 wordcount 输出的 20 行内容。
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      如需预览输出表,请打开 BigQuery 页面,选择 wordcount_output 表,然后点击预览
      在 Cloud 控制台的 BigQuery Explorer 页面中预览表。

PySpark

  1. 检查代码并将 [bucket] 占位符替换为您之前创建的 Cloud Storage 存储桶。
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. 在集群上运行代码
    1. 使用 SSH 连接到 Dataproc 集群主服务器节点
      1. 转到 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称
        Cloud 控制台中的“集群”页面。
      2. 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的 SSH
        在 Cloud 控制台的“集群详情”页面上选择“通过集群名称使用 SSH”行。

        系统会在主节点上的主目录中打开一个浏览器窗口
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 使用预安装的 vivimnano 文本编辑器创建 wordcount.py,然后粘贴 PySpark 代码列表中的 PySpark 代码
      nano wordcount.py
      
    3. 使用 spark-submit 运行 wordcount 以创建 BigQuery wordcount_output 表。输出列表将显示来自 wordcount 输出的 20 行内容。
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      如需预览输出表,请打开 BigQuery 页面,选择 wordcount_output 表,然后点击预览
      在 Cloud 控制台的 BigQuery Explorer 页面中预览表。

如需深入了解