Usa el conector de BigQuery con Dataproc Serverless para Spark

Usa spark-bigquery-connector con Apache Spark para leer y escribir datos desde y hacia BigQuery. En este instructivo, se muestra una aplicación de PySpark que usa spark-bigquery-connector.

Usa el conector de BigQuery con tu carga de trabajo

Consulta Versiones del entorno de ejecución de Dataproc sin servidores para Spark a fin de determinar la versión del conector de BigQuery que está instalada en la versión del entorno de ejecución de la carga de trabajo por lotes. Si el conector no aparece en la lista, consulta la siguiente sección a fin de obtener instrucciones para hacer que el conector esté disponible para las aplicaciones.

Cómo usar el conector con la versión 2.0 del entorno de ejecución de Spark

El conector de BigQuery no está instalado en la versión 2.0 del entorno de ejecución de Spark. Cuando usas la versión 2.0 del entorno de ejecución de Spark, puedes hacer que el conector esté disponible para tu aplicación de una de las siguientes maneras:

  • Usa el parámetro jars para apuntar a un archivo jar del conector cuando envíes tu carga de trabajo por lotes de Dataproc Serverless para Spark. En el siguiente ejemplo, se especifica un archivo jar del conector (consulta el repositorio GoogleCloudDataproc/spark-bigquery-connector en GitHub para obtener una lista de los archivos JAR del conector disponibles).
    • Ejemplo de Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Incluye el archivo JAR del conector en tu aplicación de Spark como una dependencia (consulta Cómo compilar con el conector).

Calcula los costos

En este instructivo, se usan componentes facturables de Google Cloud, que incluyen lo siguiente:

  • Dataproc sin servidores
  • BigQuery
  • Cloud Storage

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de Cloud Platform podrían ser aptos para una prueba gratuita.

BigQuery E/S

Este ejemplo lee los datos de BigQuery en un DataFrame de Spark para realizar un recuento de palabras mediante la API de fuente de datos estándar.

El conector escribe el resultado del recuento de palabras en BigQuery de la siguiente manera:

  1. Almacenar los datos en búfer en archivos temporales de tu bucket de Cloud Storage

  2. Copiar los datos en una operación de tu bucket de Cloud Storage a BigQuery

  3. Borrar los archivos temporales de Cloud Storage una vez completada la operación de carga de BigQuery (los archivos temporales también se borran después de que se cierra la aplicación Spark). Si la eliminación falla, deberás borrar los archivos temporales de Cloud Storage no deseados, que suelen ubicarse en gs://your-bucket/.spark-bigquery-jobid-UUID.

Configura la facturación

De forma predeterminada, el proyecto asociado con las credenciales o la cuenta de servicio se factura por el uso de la API. Para facturar un proyecto diferente, establece la siguiente configuración: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

También se puede agregar a una operación de lectura o escritura, de la siguiente manera: .option("parentProject", "<BILLED-GCP-PROJECT>").

Enviar una carga de trabajo por lotes de conteo de palabras de PySpark

  1. Crea el wordcount_dataset con la herramienta de línea de comandos bq en una terminal local o en Cloud Shell.
    bq mk wordcount_dataset
    
  2. Crea un bucket de Cloud Storage con la herramienta de línea de comandos de gsutil” en una terminal local o en Cloud Shell.
    gsutil mb gs://your-bucket
    
  3. Examina el código.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
    
  4. Para crear wordcount.py localmente en un editor de texto, copia el código de PySpark de la lista de códigos de PySpark. Reemplaza el marcador de posición [your-bucket] por el nombre del bucket de Cloud Storage que creaste.
  5. Envía la carga de trabajo por lotes de PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    Resultado de la terminal de muestra:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Para obtener una vista previa de la tabla de salida en la consola de Google Cloud, abre la página de BigQuery de tu proyecto, selecciona la tabla wordcount_output y, luego, haz clic en Vista previa.

Para más información