Usa el conector de BigQuery con Spark

El spark-bigquery-connector se usa con Apache Spark para leer y escribir datos desde y hacia BigQuery. En este instructivo, se proporciona un código de ejemplo que usa el conector spark-bigquery dentro de una aplicación de Spark. Para obtener instrucciones para crear un clúster, consulta las Guías de inicio rápido de Dataproc.

Proporciona el conector a tu aplicación

El conector Spark-BigQuery debe estar disponible para tu aplicación en el entorno de ejecución. Esto se puede hacer de una de las siguientes maneras:

  • Instala el conector en el directorio de los archivos jar de Spark.
  • Agrega el conector en el entorno de ejecución con el parámetro --jars, que se puede usar con la API de Dataproc o spark-submit.
    • Si usas la imagen 1.5 de Dataproc, agrega el siguiente parámetro:
      --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
    • Si usas la imagen 1.4 de Dataproc o una inferior, agrega el siguiente parámetro:
      --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
  • Incluye el archivo jar en tu aplicación de Spala o Java Spark como una dependencia (consulta Compila con el conector)

Si el conector no está disponible en el entorno de ejecución, se genera un ClassNotFoundException.

Calcula los costos

En este instructivo, se usan componentes facturables de Google Cloud Platform, que incluyen:

  • Dataproc
  • BigQuery
  • Cloud Storage

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Cloud Platform podrían ser aptos para una prueba gratuita.

Cómo leer y escribir datos desde BigQuery

Este ejemplo lee los datos de BigQuery en un DataFrame de Spark para realizar un recuento de palabras mediante la API de fuente de datos estándar.

El conector escribe los datos en BigQuery mediante el almacenamiento en búfer de todos los datos en una tabla temporal de Cloud Storage y, luego, copia todos los datos en BigQuery en una operación. El conector intenta borrar los archivos temporales una vez que la operación de carga de BigQuery se realiza correctamente, y lo vuelve a hacer cuando la aplicación Spark finaliza. Si el trabajo falla, es posible que debas quitar manualmente cualquier archivo temporal de Cloud Storage. Por lo general, encontrarás exportaciones temporales de BigQuery en gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Ejecuta el código

Antes de ejecutar este ejemplo, crea un conjunto de datos llamado “wordcount_dataset” o cambia el conjunto de datos de salida en el código a un conjunto de datos de BigQuery existente en tu proyecto de Google Cloud.

Usa el comando de bq para crear el wordcount_dataset:

bq mk wordcount_dataset

Usa el comando de gsutil para crear un depósito de Cloud Storage, que se usará a fin de exportar a BigQuery:

gsutil mb gs://[bucket]

Scala

  1. Examina el código y reemplaza el marcador de posición [bucket] por el depósito de Cloud Storage que creaste anteriormente.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery.
    val wordsDF = spark.read.format("bigquery")
      .option("table","publicdata.samples.shakespeare")
      .load()
      .cache()
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save()
    
    
  2. Ejecuta el código en tu clúster
    1. Establece una conexión SSH al nodo principal del clúster de Cloud Dataproc
      1. Ve a la página de clústeres de Cloud Dataproc de tu proyecto en Cloud Console y, luego, haz clic en el nombre de tu clúster
      2. En la página de detalles del clúster, selecciona la pestaña VM Instances (Instancias de VM) y, luego, haz clic en la selección SSH que aparece a la derecha del nombre de tu nodo principal del clúster

        Se abrirá una ventana del navegador en tu directorio principal del nodo principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea wordcount.scala con el editor de texto vi, vim o nano preinstalado y, luego, pega el código de Scala desde la lista de códigos de Scala.
      nano wordcount.scala
        
    3. Inicia el REPL de spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Ejecuta wordcount.scala con el comando :load wordcount.scala para crear la tabla de BigQuery wordcount_output. La lista de salida muestra 20 líneas del resultado del recuento de palabras.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para obtener una vista previa de la tabla de resultados, abre la página BigQuery de tu proyecto, selecciona la tabla wordcount_output y haz clic en Vista previa.

PySpark

  1. Examina el código y reemplaza el marcador de posición [bucket] por el depósito de Cloud Storage que creaste anteriormente.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. Ejecuta el código en tu clúster
    1. Establece una conexión SSH al nodo principal del clúster de Cloud Dataproc
      1. Ve a la página de clústeres de Cloud Dataproc de tu proyecto en Cloud Console y, luego, haz clic en el nombre de tu clúster
      2. En la página de detalles del clúster, selecciona la pestaña VM Instances (Instancias de VM) y, luego, haz clic en la selección SSH que aparece a la derecha del nombre de tu nodo principal del clúster

        Se abrirá una ventana del navegador en tu directorio principal del nodo principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea wordcount.py con el editor de texto vi, vim o nano preinstalado y, luego, pega el código de PySpark desde la lista de código de PySpark.
      nano wordcount.py
      
    3. Ejecuta el conteo de palabras con spark-submit para crear la tabla de BigQuery wordcount_output. La lista de salida muestra 20 líneas del resultado del recuento de palabras.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para obtener una vista previa de la tabla de resultados, abre la página BigQuery de tu proyecto, selecciona la tabla wordcount_output y haz clic en Vista previa.

Más información