Usa el conector de BigQuery con Spark

El conector de BigQuery se puede usar con Apache Spark para leer y escribir datos desde y hacia BigQuery. En esta página, se proporciona código de ejemplo que usa el conector de BigQuery con Spark. Consulta las Guías de inicio rápido de Cloud Dataproc para obtener instrucciones sobre cómo crear un clúster.

Cómo leer y escribir datos desde BigQuery

Este ejemplo lee datos desde BigQuery en Spark para realizar un recuento de palabras con SparkContext.newAPIHadoopRDD (consulta la documentación de Spark para obtener más información). Vuelve a escribir los datos en BigQuery con PairRDDFunctions.saveAsNewAPIHadoopDataset.

Antes de probar este ejemplo, crea un conjunto de datos llamado "wordcount_dataset" o cambia el conjunto de datos de salida en el código por un conjunto de datos de BigQuery existente en tu proyecto de Google Cloud Platform. Aquí está el comando bq para crear el conjunto de datos:

bq mk wordcount_dataset

Scala

Examina el código

IndirectBigQueryOutputFormat le proporciona a Hadoop la capacidad de escribir valores JsonObject directamente en la tabla de BigQuery. Esta clase proporciona acceso a los registros de BigQuery a través de una extensión de la clase OutputFormat de Hadoop. Para usarla correctamente, se deben establecer varios parámetros en la configuración de Hadoop y la clase OutputFormat debe configurarse en IndirectBigQueryOutputFormat. A continuación, se muestra un ejemplo de los parámetros para configurar y las líneas de código necesarias para usar correctamente IndirectBigQueryOutputFormat.
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

// Assumes you have a spark context (sc) -- running from spark-shell REPL.
// Marked as transient since configuration is not Serializable. This should
// only be necessary in spark-shell REPL.
@transient
val conf = sc.hadoopConfiguration

// Input parameters.
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val projectId = conf.get("fs.gs.project.id")
val bucket = conf.get("fs.gs.system.bucket")

// Input configuration.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)

// Output parameters.
val outputTableId = projectId + ":wordcount_dataset.wordcount_output"
// Temp output bucket that is deleted upon completion of job.
val outputGcsPath = ("gs://" + bucket + "/hadoop/tmp/bigquery/wordcountoutput")

// Output configuration.
// Let BigQuery auto-detect output schema (set to null below).
BigQueryOutputConfiguration.configureWithAutoSchema(
    conf,
    outputTableId,
    outputGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_,_]])

// (Optional) Configure the KMS key used to encrypt the output table.
BigQueryOutputConfiguration.setKmsKeyName(
    conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

conf.set("mapreduce.job.outputformat.class",
         classOf[IndirectBigQueryOutputFormat[_,_]].getName)

// Truncate the table before writing output to allow multiple runs.
conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
         "WRITE_TRUNCATE")

// Helper to convert JsonObjects to (word, count) tuples.
def convertToTuple(record: JsonObject) : (String, Long) = {
  val word = record.get("word").getAsString.toLowerCase
  val count = record.get("word_count").getAsLong
  return (word, count)
}

// Helper to convert (word, count) tuples to JsonObjects.
def convertToJson(pair: (String, Long)) : JsonObject = {
  val word = pair._1
  val count = pair._2
  val jsonObject = new JsonObject()
  jsonObject.addProperty("word", word)
  jsonObject.addProperty("word_count", count)
  return jsonObject
}

// Load data from BigQuery.
val tableData = sc.newAPIHadoopRDD(
    conf,
    classOf[GsonBigQueryInputFormat],
    classOf[LongWritable],
    classOf[JsonObject])

// Perform word count.
val wordCounts = (tableData
    .map(entry => convertToTuple(entry._2))
    .reduceByKey(_ + _))

// Display 10 results.
wordCounts.take(10).foreach(l => println(l))

// Write data back into a new BigQuery table.
// IndirectBigQueryOutputFormat discards keys, so set key to null.
(wordCounts
    .map(pair => (null, convertToJson(pair)))
    .saveAsNewAPIHadoopDataset(conf))

Ejecuta el código de tu clúster

  1. Establece una conexión SSH al nodo principal del clúster de Cloud Dataproc de la siguiente manera:
    1. Ve a la página Clusters (Clústeres) de Cloud Dataproc de tu proyecto en GCP Console y luego haz clic en el nombre de tu clúster.
    2. En la página de detalles del clúster, selecciona la pestaña Instancias de VM (VM Instances) y luego haz clic en la selección de SSH que aparece a la derecha del nombre del nodo principal de tu clúster.

      Se abre una ventana del navegador en tu directorio principal del nodo principal.
          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea wordcount.scala con el editor de texto previamente instalado vi, vim o nano y luego pega en el código de Scala de la lista de código de Scala:
    nano wordcount.scala
      
  3. Inicia el REPL spark-shell (quita el parámetro jars si usas Cloud Dataproc 1.0-1.2).
    $ spark-shell --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    
  4. Ejecuta wordcount.scala con el comando :load wordcount.scala para crear la tabla wordcount_output de BigQuery. La lista de salida muestra 10 líneas del resultado del recuento de palabras.
    :load wordcount.scala
    ...
    (pinnace,3)
    (bone,21)
    (lug,2)
    (vailing,2)
    (bombast,3)
    (gaping,11)
    (hem,5)
    ('non,1)
    (stinks,1)
    (forsooth,48)
    

    Para obtener la vista previa de la tabla de salida, abre la página de tu proyecto de BigQuery, selecciona la tabla wordcount_output y luego haz clic en Vista previa (Preview).

PySpark

Examina el código

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda record: json.loads(record[1]))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
output_files = output_directory + '/part-*'

sql_context = SQLContext(sc)
(word_counts
 .toDF(['word', 'word_count'])
 .write.format('json').save(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)

Ejecuta el código de tu clúster

  1. Establece una conexión SSH al nodo principal del clúster de Cloud Dataproc de la siguiente manera:
    1. Ve a la página Clusters (Clústeres) de Cloud Dataproc de tu proyecto en GCP Console y luego haz clic en el nombre de tu clúster.
    2. En la página de detalles del clúster, selecciona la pestaña Instancias de VM (VM Instances) y luego haz clic en la selección de SSH que aparece a la derecha del nombre del nodo principal de tu clúster.

      Se abre una ventana del navegador en tu directorio principal del nodo principal.
          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Crea wordcount.py con el editor de texto previamente instalado vi, vim o nano y luego pega en el código de PySpark de la lista de código de PySpark.
    nano wordcount.py
      
  3. Ejecuta el recuento de palabras con spark-submit para crear la tabla wordcount_output de BigQuery. La lista de salida muestra 10 líneas del resultado del recuento de palabras.
    spark-submit wordcount.py
    ...
    (pinnace,3)
    (bone,21)
    (lug,2)
    (vailing,2)
    (bombast,3)
    (gaping,11)
    (hem,5)
    ('non,1)
    (stinks,1)
    (forsooth,48)
    

    Para obtener una vista previa de la tabla de salida, abre la página de tu proyecto de BigQuery, selecciona la tabla wordcount_output y luego haz clic en Vista previa (Preview).
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

Documentación de Cloud Dataproc
¿Necesitas ayuda? Visita nuestra página de asistencia.