Usar o conector do BigQuery com o Dataproc sem servidor para Spark

Usar o spark-bigquery-connector com o Apache Spark para ler e gravar dados no BigQuery. Neste tutorial, demonstramos um aplicativo do PySpark que usa a spark-bigquery-connector.

Usar o conector do BigQuery com sua carga de trabalho

Consulte Versões de ambiente de execução sem servidor do Dataproc para Spark para determinar a versão do conector do BigQuery que está instalada na versão do ambiente de execução da carga de trabalho em lote. Se o conector não estiver listado, consulte a próxima seção para saber como disponibilizá-lo para aplicativos.

Como usar o conector com a versão 2.0 do ambiente de execução do Spark

O conector do BigQuery não está instalado na versão 2.0 do ambiente de execução do Spark. Ao usar o ambiente de execução do Spark versão 2.0, é possível disponibilizar o conector para seu aplicativo de uma das seguintes maneiras:

  • Use o parâmetro jars para apontar para um arquivo jar do conector ao enviar a carga de trabalho em lote do Dataproc sem servidor para Spark. O exemplo a seguir especifica um arquivo jar do conector. Consulte o repositório GoogleCloudDataproc/spark-bigquery-connector no GitHub para ver a lista dos arquivos jar do conector disponíveis.
    • Exemplo da Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Inclua o arquivo jar do conector no aplicativo Spark como uma dependência. Consulte Como compilar no conector.

Calcular os custos

Neste tutorial, há componentes faturáveis do Google Cloud, entre eles:

  • Dataproc sem servidor
  • BigQuery
  • Cloud Storage

Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto. É possível que novos usuários do Cloud Platform tenham direito a uma avaliação gratuita.

E/S do BigQuery

Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.

O conector grava a saída da contagem de palavras no BigQuery:

  1. Armazenar os dados em buffer em arquivos temporários no bucket do Cloud Storage

  2. Copiar os dados em uma operação do bucket do Cloud Storage para o BigQuery

  3. Excluir os arquivos temporários no Cloud Storage após a conclusão da operação de carregamento do BigQuery. Os arquivos temporários também são excluídos após o encerramento do aplicativo Spark. Se a exclusão falhar, será necessário excluir todos os arquivos temporários indesejados do Cloud Storage, que normalmente são colocados em gs://your-bucket/.spark-bigquery-jobid-UUID.

Configurar faturamento

Por padrão. o projeto associado às credenciais ou à conta de serviço é cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>").

Enviar uma carga de trabalho em lote de contagem de palavras do PySpark

  1. Crie o wordcount_dataset com a ferramenta de linha de comando bq em um terminal local ou no Cloud Shell.
    bq mk wordcount_dataset
    
  2. Crie um bucket do Cloud Storage com a ferramenta de linha de comando gsutil" em um terminal local ou no Cloud Shell.
    gsutil mb gs://your-bucket
    
  3. Examine o código.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
    
  4. Crie wordcount.py localmente em um editor de texto copiando o código PySpark da lista de códigos do PySpark. Substitua o marcador [seu-bucket] pelo nome do bucket do Cloud Storage que você criou.
  5. Envie a carga de trabalho em lote do PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    Exemplo de saída do terminal:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Para visualizar a tabela de saída no console do Google Cloud, abra a página do BigQuery do projeto, selecione a tabela wordcount_output e clique em Visualizar.

Para mais informações