Usar el conector de BigQuery con Google Cloud Serverless para Apache Spark

Usa el conector spark-bigquery con Apache Spark para leer y escribir datos desde y hacia BigQuery. En este tutorial se muestra una aplicación de PySpark que usa spark-bigquery-connector.

Usar el conector de BigQuery con su carga de trabajo

Consulta las versiones de tiempo de ejecución de Serverless para Apache Spark para determinar la versión del conector de BigQuery que está instalada en la versión de tiempo de ejecución de tu carga de trabajo por lotes. Si el conector no aparece en la lista, consulta la siguiente sección para obtener instrucciones sobre cómo hacer que el conector esté disponible para las aplicaciones.

Cómo usar el conector con la versión 2.0 del tiempo de ejecución de Spark

El conector de BigQuery no está instalado en la versión 2.0 del tiempo de ejecución de Spark. Si usas la versión 2.0 del tiempo de ejecución de Spark, puedes hacer que el conector esté disponible para tu aplicación de una de las siguientes formas:

  • Usa el parámetro jars para apuntar a un archivo JAR de conector cuando envíes tu Google Cloud carga de trabajo por lotes de Serverless para Apache Spark. Google Cloud En el siguiente ejemplo se especifica un archivo JAR de conector (consulta el repositorio GoogleCloudDataproc/spark-bigquery-connector en GitHub para ver una lista de los archivos JAR de conectores disponibles).
    • Ejemplo de Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Incluye el archivo JAR del conector en tu aplicación Spark como dependencia (consulta Compilar con el conector).

Calcular los costes

En este tutorial se usan componentes facturables de Google Cloud, como los siguientes:

  • Serverless para Apache Spark
  • BigQuery
  • Cloud Storage

Si quieres generar una estimación de costes en función del uso previsto, usa la calculadora de precios.

Los nuevos usuarios de Cloud Platform pueden disfrutar de una prueba gratuita.

E/S de BigQuery

En este ejemplo se leen datos de BigQuery y se insertan en un DataFrame de Spark para contar palabras mediante la API de fuente de datos estándar.

El conector escribe el resultado de WordCount en BigQuery de la siguiente manera:

  1. Almacenando los datos en archivos temporales de tu segmento de Cloud Storage

  2. Copiar los datos en una sola operación desde tu segmento de Cloud Storage a BigQuery

  3. Eliminar los archivos temporales de Cloud Storage una vez que se haya completado la operación de carga de BigQuery (los archivos temporales también se eliminan cuando finaliza la aplicación Spark). Si la eliminación falla, tendrás que eliminar los archivos temporales de Cloud Storage que no quieras, que normalmente se encuentran en gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID.

Configurar la facturación

De forma predeterminada, el proyecto asociado a las credenciales o a la cuenta de servicio se factura por el uso de la API. Para facturar un proyecto diferente, define la siguiente configuración: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

También puedes añadir a una operación de lectura o escritura, como se indica a continuación: .option("parentProject", "<BILLED-GCP-PROJECT>").

Enviar una carga de trabajo por lotes de recuento de palabras de PySpark

Ejecuta una carga de trabajo por lotes de Spark que cuenta el número de palabras de un conjunto de datos público.

  1. Abre un terminal local o Cloud Shell.
  2. Crea el wordcount_dataset con la herramienta de línea de comandos bq en un terminal local o en Cloud Shell.
    bq mk wordcount_dataset
    
  3. Crea un segmento de Cloud Storage con Google Cloud CLI.
    gcloud storage buckets create gs://YOUR_BUCKET
    
    Sustituye YOUR_BUCKET por el nombre del segmento de Cloud Storage que has creado.
  4. Crea el archivo wordcount.py localmente en un editor de texto copiando el siguiente código de PySpark.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "YOUR_BUCKET"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  5. Envía la carga de trabajo por lotes de PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=YOUR_BUCKET
    
    Ejemplo de salida de terminal:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Para previsualizar la tabla de resultados en la Google Cloud consola, abre la página de BigQuery de tu proyecto, selecciona la tabla wordcount_output y haz clic en Vista previa.

Más información