Como fazer streaming de registros Avro para o BigQuery usando o Dataflow

Last reviewed 2020-02-06 UTC

Neste tutorial, descrevemos como armazenar objetos SpecificRecord Avro no BigQuery usando o Dataflow ao gerar automaticamente o esquema da tabela e ao transformar os elementos de entrada. Este tutorial também mostra o uso de classes geradas pelo Avro para materializar ou transmitir dados intermediários entre workers no pipeline do Dataflow.

O Apache Avro é um sistema de serialização que depende de esquemas para estruturar dados. Como o esquema está sempre presente quando os dados Avro são lidos ou gravados, a serialização é rápida e pequena. Os benefícios de desempenho fazem dele uma escolha popular para transmitir mensagens entre sistemas, como um app que envia eventos para um sistema de análise por meio de um agente de mensagens. É possível usar o esquema Avro para gerenciar o esquema de armazenamento de dados do BigQuery. A conversão do esquema Avro em estrutura de tabela do BigQuery requer código personalizado, demonstrado neste tutorial.

Este tutorial é destinado a desenvolvedores e arquitetos interessados em usar o esquema Avro para gerenciar o esquema de armazenamento de dados do BigQuery. Neste tutorial, presumimos que você esteja familiarizado com Avro e Java.

O diagrama a seguir ilustra a arquitetura de alto nível deste tutorial.

Arquitetura de um esquema Avro que gerencia seu esquema de armazenamento de dados do BigQuery.

Este tutorial usa um sistema simples de processamento de pedidos com as seguintes etapas para demonstrar esse padrão de arquitetura:

  • Um aplicativo on-line gera eventos quando o cliente faz uma compra.
  • Um objeto de pedido contém um identificador exclusivo, a lista de itens comprados e um carimbo de data/hora.
  • Um pipeline do Dataflow lê as mensagens Avro OrderDetails SpecificRecord de um tópico do Pub/Sub.
  • O pipeline do Dataflow grava os registros no Cloud Storage como arquivos Avro.
  • A classe OrderDetails gera automaticamente o esquema correspondente do BigQuery.
  • Os objetos OrderDetails são gravados no BigQuery usando uma função de transformação genérica.

Objetivos

  • Ingerir strings JSON de um fluxo de dados do Pub/Sub usando o Dataflow.
  • Transformar os objetos JSON em objetos de classes geradas pelo Avro.
  • Gerar o esquema da tabela do BigQuery a partir do esquema Avro.
  • Gravar os registros Avro em um arquivo no Cloud Storage.
  • Gravar os registros Avro no BigQuery.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  4. Ative as APIs BigQuery, Cloud Storage, and Dataflow.

    Ative as APIs

  5. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  6. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  7. Ative as APIs BigQuery, Cloud Storage, and Dataflow.

    Ative as APIs

  8. No Console do Google Cloud, ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte inferior do Console do Google Cloud, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a CLI do Google Cloud já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

Como configurar o ambiente

  1. No Cloud Shell, clone o repositório de origem:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Abra o arquivo env.sh.

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  3. O arquivo env.sh contém valores padrão predefinidos que podem ser usados para este tutorial, mas é possível modificar esses arquivos para o ambiente.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="${GOOGLE_CLOUD_PROJECT}_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="${MY_BUCKET}/out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    BQ_TABLE="orders"
    

    Substitua:

    • avro-records: o nome do tópico do Pub/Sub.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: o nome do intervalo do Cloud Storage gerado pelo código do projeto do Cloud.
    • $MY_BUCKET/""out/": o caminho para o intervalo do Cloud Storage que contém a saída Avro.
    • us-central1: a região que você usa para o Pub/Sub e o Dataflow. Para saber mais sobre regiões, consulte Geografia e regiões.
    • US: a região do BigQuery. Para mais informações sobre locais, consulte Locais de conjuntos de dados.
    • sales: o nome do conjunto de dados do BigQuery.
    • orders: o nome da tabela do BigQuery.
    • 1: o número máximo de workers do Dataflow.
  4. Defina as variáveis de ambiente:

     . ./env.sh
    

Como criar recursos

  1. No Cloud Shell, crie um tópico do Pub/Sub:

    gcloud pubsub topics create "${MY_TOPIC}"
    
  2. Crie um bucket do Cloud Storage:

    gsutil mb -l "${REGION}" -c regional "gs://${MY_BUCKET}"
    

    O intervalo do Cloud Storage faz backup dos eventos brutos gerados pelo aplicativo. O intervalo também pode servir como uma fonte alternativa para análise e validação off-line usando jobs do Spark e do Hadoop executados no Dataproc.

  3. Crie um conjunto de dados do BigQuery:

    bq --location="${BQ_REGION}" mk --dataset "${GOOGLE_CLOUD_PROJECT}:${BQ_DATASET}"
    

    Um conjunto de dados do BigQuery contém tabelas e visualizações em uma única região ou em uma região geográfica que contém várias regiões. Para mais informações, consulte Como criar conjuntos de dados.

Como iniciar o aplicativo Beam Dataflow

  1. No Cloud Shell, implante e execute o pipeline no executor do Dataflow:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    
    mvn clean package
    
    java -cp target/BeamAvro-bundled-1.0-SNAPSHOT.jar \
    com.google.cloud.solutions.beamavro.AvroToBigQuery \
    --runner=DataflowRunner \
    --project="${GOOGLE_CLOUD_PROJECT}" \
    --stagingLocation="gs://${MY_BUCKET}/stage/" \
    --tempLocation="gs://${MY_BUCKET}/temp/" \
    --inputPath="projects/${GOOGLE_CLOUD_PROJECT}/topics/${MY_TOPIC}" \
    --workerMachineType=n1-standard-1 \
    --region="${REGION}" \
    --dataset="${BQ_DATASET}" \
    --bigQueryTable="${BQ_TABLE}" \
    --outputPath="gs://${MY_BUCKET}/out/" \
    --jsonFormat=false \
    --avroSchema="$(<../orderdetails.avsc)"
    

    A saída contém o código do app. Anote o código do app, porque ele será necessário no tutorial posteriormente.

  2. No Console do Google Cloud, acesse o Dataflow.

    Acessar o Dataflow

  3. Para ver o status do pipeline, clique no código do aplicativo. O status do pipeline é exibido como um gráfico.

    Gráfico do status do pipeline.

Revisar o código

No arquivo AvroToBigQuery.java, as opções de pipeline com os parâmetros obrigatórios transmitidos pelos parâmetros de linha de comando. A opção modo de streaming também está ativada. O esquema da tabela do BigQuery é gerado automaticamente a partir do esquema do Avro usando o esquema Beam (em inglês) pelo IO do BigQuery

Para o formato de entrada Avro, os objetos são lidos no Pub/Sub. Se o formato de entrada for JSON, os eventos serão lidos e transformados em objetos Avro.

Schema avroSchema = new Schema.Parser().parse(options.getAvroSchema());

if (options.getJsonFormat()) {
  return input
      .apply("Read Json", PubsubIO.readStrings().fromTopic(options.getInputPath()))
      .apply("Make GenericRecord", MapElements.via(JsonToAvroFn.of(avroSchema)));
} else {
  return input.apply("Read GenericRecord", PubsubIO.readAvroGenericRecords(avroSchema)
      .fromTopic(options.getInputPath()));
}

O pipeline se ramifica para executar duas gravações separadas:

O BigQueryIO grava os objetos Avro no BigQuery transformando-os internamente em objetos TableRow. usando esquemas do Beam. Consulte o mapeamento entre tipos de dados do BigQuery e tipos de dados Avro.

Ver resultados no BigQuery

Para testar o pipeline, inicie o script gen.py. Esse script simula a geração de eventos de pedido e os envia para o tópico do Pub/Sub.

  1. No Cloud Shell, mude para o diretório de script do gerador de eventos de amostra e execute o script:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. No console do Google Cloud, acesse o BigQuery.

    Ir para o BigQuery

  3. Para visualizar o esquema da tabela, clique no conjunto de dados sales e selecione a tabela orders. Se você modificar as variáveis de ambiente padrão em env.sh, os nomes do conjunto de dados e da tabela serão diferentes.

    Esquema da tabela "pedidos".

  4. Para visualizar alguns dados de amostra, execute uma consulta no Editor de consultas:

    SELECT * FROM sales.orders LIMIT 5
    

    Resultado da consulta de dados de amostra.

    O esquema da tabela do BigQuery é gerado automaticamente a partir dos registros Avro, e os dados são convertidos automaticamente na estrutura da tabela do BigQuery.

Limpeza

Exclua o projeto

  1. No Console do Google Cloud, acesse a página Gerenciar recursos.

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir recursos individuais

  1. Siga estas instruções para interromper o job do Dataflow.

  2. Exclua o bucket do Cloud Storage:

    gsutil rm -r gs://$MY_BUCKET
    

A seguir