Usar o conector do BigQuery com o Dataproc sem servidor para Spark

Use o spark-bigquery-connector com o Apache Spark para ler e gravar dados no e a partir do BigQuery. Este tutorial demonstra um aplicativo PySpark que usa o spark-bigquery-connector.

Usar o conector do BigQuery com a carga de trabalho

Consulte Dataproc Serverless for Spark releases para determinar a versão do conector do BigQuery instalada na versão do ambiente de execução da carga de trabalho em lote. Se o conector não estiver listado, consulte a próxima seção para instruções sobre como disponibilizar o conector para aplicativos.

Como usar o conector com o ambiente de execução do Spark versão 2.0

O conector do BigQuery não está instalado no ambiente de execução do Spark versão 2.0. Ao usar o ambiente de execução do Spark versão 2.0, é possível disponibilizar o conector para o aplicativo de uma das seguintes maneiras:

  • Use o parâmetro jars para apontar para um arquivo jar de conector ao enviar sua carga de trabalho em lote do Dataproc Serverless para Spark. O exemplo a seguir especifica um arquivo jar de conector. Consulte o repositório GoogleCloudDataproc/spark-bigquery-connector no GitHub para conferir uma lista de arquivos jar de conector disponíveis.
    • Exemplo da Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Inclua o arquivo jar do conector no aplicativo Spark como uma dependência. Consulte Como compilar com o conector.

Calcular custos

Neste tutorial, há componentes faturáveis do Google Cloud, entre eles:

  • Dataproc sem servidor
  • BigQuery
  • Cloud Storage

Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. É possível que novos usuários do Cloud Platform tenham direito a uma avaliação gratuita.

E/S do BigQuery

Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.

O conector grava a saída de contagem de palavras no BigQuery da seguinte maneira:

  1. Armazenar os dados em arquivos temporários no bucket do Cloud Storage

  2. Como copiar os dados em uma operação do bucket do Cloud Storage para o BigQuery

  3. Exclusão dos arquivos temporários no Cloud Storage após a conclusão da operação de carregamento do BigQuery. Os arquivos temporários também são excluídos após o encerramento do aplicativo Spark. Se a exclusão falhar, será necessário excluir todos os arquivos temporários indesejados do Cloud Storage, que geralmente são colocados em gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID.

Configurar o faturamento

Por padrão, o projeto associado às credenciais ou à conta de serviço é cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Também é possível adicionar a uma operação de leitura ou gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>").

Enviar uma carga de trabalho de contagem de palavras do PySpark

Execute uma carga de trabalho em lote do Spark que conta o número de palavras em um conjunto de dados público.

  1. Abra um terminal local ou o Cloud Shell.
  2. Crie o wordcount_dataset com a ferramenta de linha de comando bq em um terminal local ou no Cloud Shell.
    bq mk wordcount_dataset
    
  3. Crie um bucket do Cloud Storage com a Google Cloud CLI.
    gcloud storage buckets create gs://YOUR_BUCKET
    
    Substitua YOUR_BUCKET pelo nome do bucket do Cloud Storage criado.
  4. Crie o arquivo wordcount.py localmente em um editor de texto copiando o seguinte código PySpark.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "YOUR_BUCKET"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  5. Envie a carga de trabalho em lote do PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=YOUR_BUCKET
    
    Exemplo de saída do terminal:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Para visualizar a tabela de saída no console do Google Cloud, abra a página do BigQuery do projeto, selecione a tabela wordcount_output e clique em Visualizar.

Para mais informações