Como fazer streaming de registros Avro para o BigQuery usando o Dataflow

Neste tutorial, descrevemos como armazenar objetos SpecificRecord Avro no BigQuery usando o Dataflow ao gerar automaticamente o esquema da tabela e ao transformar os elementos de entrada. Este tutorial também mostra o uso de classes geradas pelo Avro para materializar ou transmitir dados intermediários entre workers no pipeline do Dataflow.

O Apache Avro é um sistema de serialização que depende de esquemas para estruturar dados. Como o esquema está sempre presente quando os dados Avro são lidos ou gravados, a serialização é rápida e pequena. Os benefícios de desempenho fazem dele uma escolha popular para transmitir mensagens entre sistemas, como um app que envia eventos para um sistema de análise por meio de um agente de mensagens. É possível usar o esquema Avro para gerenciar o esquema de armazenamento de dados do BigQuery. A conversão do esquema Avro em estrutura de tabela do BigQuery requer código personalizado, demonstrado neste tutorial.

Este tutorial é destinado a desenvolvedores e arquitetos interessados em usar o esquema Avro para gerenciar o esquema de armazenamento de dados do BigQuery. Neste tutorial, presumimos que você esteja familiarizado com Avro e Java.

O diagrama a seguir ilustra a arquitetura de alto nível deste tutorial.

Arquitetura de um esquema Avro que gerencia seu esquema de armazenamento de dados do BigQuery.

Este tutorial usa um sistema simples de processamento de pedidos com as seguintes etapas para demonstrar esse padrão de arquitetura:

  • Um aplicativo on-line gera eventos quando o cliente faz uma compra.
  • Um objeto de pedido contém um identificador exclusivo, a lista de itens comprados e um carimbo de data/hora.
  • Um pipeline do Dataflow lê as mensagens Avro OrderDetails SpecificRecord de um tópico do Pub/Sub.
  • O pipeline do Dataflow grava os registros no Cloud Storage como arquivos Avro.
  • A classe OrderDetails gera automaticamente o esquema correspondente do BigQuery.
  • Os objetos OrderDetails são gravados no BigQuery usando uma função de transformação genérica.

Objetivos

  • Ingerir strings JSON de um fluxo de dados do Pub/Sub usando o Dataflow.
  • Transformar os objetos JSON em objetos de classes geradas pelo Avro.
  • Gerar o esquema da tabela do BigQuery a partir do esquema Avro.
  • Gravar os registros Avro em um arquivo no Cloud Storage.
  • Gravar os registros Avro no BigQuery.

Custos

Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem ser qualificados para uma avaliação gratuita.

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Para mais informações, consulte Como fazer a limpeza.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs BigQuery, Cloud Storage, and Dataflow.

    Ative as APIs

  5. No Console do Cloud, ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte inferior do Console do Cloud, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente com o SDK do Cloud pré-instalado com a ferramenta de linha de comando gcloud e os valores já definidos para seu projeto atual. A inicialização da sessão pode levar alguns segundos.

Como configurar o ambiente

  1. No Cloud Shell, clone o repositório de origem:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Gere classes Avro:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn generate-sources
    

    Esse comando usa o arquivo orderdetails.avsc para gerar a classe OrderDetails e a classe OrderItems. A classe OrderDetails tem um identificador exclusivo, um carimbo de data/hora e uma lista de OrderItems. A classe OrderItems tem um identificador exclusivo, um nome e um preço. O esquema Avro é propagado para a tabela do BigQuery, em que uma linha contendo um pedido tem uma matriz de registros do tipo OrderItem. Para mais informações, consulte Como especificar colunas aninhadas e repetidas.

  3. Abra o arquivo env.sh.

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  4. O arquivo env.sh contém valores padrão predefinidos que podem ser usados para este tutorial, mas é possível modificar esses arquivos para o ambiente.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="$GOOGLE_CLOUD_PROJECT""_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="$MY_BUCKET/""out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    `BQ_TABLE=`"`orders`"
    
    # Maximum number of Dataflow workers
    NUM_WORKERS=1
    

    Substitua:

    • avro-records: o nome do tópico do Pub/Sub.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: o nome do intervalo do Cloud Storage gerado pelo código do projeto do Cloud.
    • $MY_BUCKET/""out/": o caminho para o intervalo do Cloud Storage que contém a saída Avro.
    • us-central1: a região que você usa para o Pub/Sub e o Dataflow. Para saber mais sobre regiões, consulte Geografia e regiões.
    • US: a região do BigQuery. Para mais informações sobre locais, consulte Locais de conjuntos de dados.
    • sales: o nome do conjunto de dados do BigQuery.
    • orders: o nome da tabela do BigQuery.
    • 1: o número máximo de workers do Dataflow.
  5. Defina as variáveis de ambiente:

     . ./env.sh
    

Como criar recursos

  1. No Cloud Shell, crie um tópico do Pub/Sub:

    gcloud pubsub topics create $MY_TOPIC
    
  2. Crie um bucket do Cloud Storage:

    gsutil mb -l $REGION -c regional gs://$MY_BUCKET
    

    O intervalo do Cloud Storage faz backup dos eventos brutos gerados pelo aplicativo. O intervalo também pode servir como uma fonte alternativa para análise e validação off-line usando jobs do Spark e do Hadoop executados no Dataproc.

  3. Crie um conjunto de dados do BigQuery:

    bq --location=$BQ_REGION mk --dataset $GOOGLE_CLOUD_PROJECT:$BQ_DATASET

    Um conjunto de dados do BigQuery contém tabelas e visualizações em uma única região ou em uma região geográfica que contém várias regiões. Para mais informações, consulte Como criar conjuntos de dados.

Como iniciar o aplicativo Beam Dataflow

  1. No Cloud Shell, implante e execute o pipeline no executor do Dataflow:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn compile exec:java \
        -Dexec.mainClass=com.google.cloud.solutions.beamavro.AvroToBigQuery \
        -Dexec.cleanupDaemonThreads=false \
        -Dexec.args=" \
        --project=$GOOGLE_CLOUD_PROJECT \
        --runner=DataflowRunner \
        --stagingLocation=gs://$MY_BUCKET/stage/ \
        --tempLocation=gs://$MY_BUCKET/temp/ \
        --inputPath=projects/$GOOGLE_CLOUD_PROJECT/topics/$MY_TOPIC \
        --workerMachineType=n1-standard-1 \
        --maxNumWorkers=$NUM_WORKERS \
        --region=$REGION \
        --dataset=$BQ_DATASET \
        --bqTable=$BQ_TABLE \
        --outputPath=$AVRO_OUT"
    

    A saída contém o código do app. Anote o código do app, porque ele será necessário no tutorial posteriormente.

  2. No Console do Cloud, acesse o Dataflow.

    Acessar o Dataflow

  3. Para ver o status do pipeline, clique no código do aplicativo. O status do pipeline é exibido como um gráfico.

    Gráfico do status do pipeline.

Revisar o código

No arquivo AvroToBigQuery.java, as opções de pipeline com os parâmetros obrigatórios transmitidos pelos parâmetros de linha de comando. A opção modo de streaming também está ativada. O esquema da tabela do BigQuery é gerado a partir do esquema da classe Avro e é usado posteriormente pela classe IO do BigQuery:

TableSchema ts = BigQueryAvroUtils.getTableSchema(OrderDetails.SCHEMA$);

A classe AvroUtils itera os campos no objeto de esquema Avro e gera objetos TableFieldSchema correspondentes recursivamente. Os objetos são encapsulados em um objeto TableSchema e retornados.

Para o formato de entrada Avro, os objetos são lidos no Pub/Sub. Se o formato de entrada for JSON, os eventos serão lidos e transformados em objetos Avro.

private static PCollection<OrderDetails> getInputCollection(
    Pipeline pipeline, String inputPath, FORMAT format) {
  if (format == FORMAT.JSON) {
    // Transform JSON to Avro
    return pipeline
        .apply("Read JSON from PubSub", PubsubIO.readStrings().fromTopic(inputPath))
        .apply("To binary", ParDo.of(new JSONToAvro()));
  } else {
    // Read Avro
    return pipeline.apply(
        "Read Avro from PubSub", PubsubIO.readAvros(OrderDetails.class).fromTopic(inputPath));
  }
}

O pipeline se ramifica. A transformação Write to Cloud Storage é uma transformação composta, que coleta os registros na janela por 10 segundos e os grava em um arquivo Avro no Cloud Storage usando o gravador AvroIO:

ods.apply(
    "Write to GCS",
    new AvroWriter()
        .withOutputPath(options.getOutputPath())
        .withRecordType(OrderDetails.class));

A transformação Write to BigQuery grava os registros na tabela do BigQuery:

ods.apply(
    "Write to BigQuery",
    BigQueryIO.write()
        .to(bqStr)
        .withSchema(ts)
        .withWriteDisposition(WRITE_APPEND)
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withFormatFunction(TABLE_ROW_PARSER));

A transformação BigQueryIO grava os objetos Avro no BigQuery transformando-os em objetos TableRow usando o método TABLE_ROW_PARSER. O analisador chama o método convertSpecificRecordToTableRow na classe BigQueryAvroUtils, criada com base em uma classe de teste no projeto do Apache Beam. O method analisa recursivamente os campos Avro e os adiciona a um objeto TableRow.

private static TableRow convertSpecificRecordToTableRow(
    SpecificRecord record, List<TableFieldSchema> fields) {
  TableRow row = new TableRow();
  for (TableFieldSchema subSchema : fields) {
    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
    // is required, so it may not be null.
    Field field = record.getSchema().getField(subSchema.getName());
    if (field == null || field.name() == null) {
      continue;
    }
    Object convertedValue = getTypedCellValue(field.schema(), subSchema, record.get(field.pos()));
    if (convertedValue != null) {
      // To match the JSON files exported by BigQuery, do not include null values in the output.
      row.set(field.name(), convertedValue);
    }
  }

  return row;
}

A tabela a seguir mostra o mapeamento entre os tipos de dados do BigQuery e os tipos de dados Avro. Preste atenção em alguns tipos, como Date e Timestamp, que são identificados pelo tipo lógico do campo.

BigQuery Avro
STRING STRING
GEOGRAPHY STRING
BYTES BYTES
INTEGER INT
FLOAT FLOAT
FLOAT64 DOUBLE
NUMERIC BYTES
BOOLEAN BOOLEAN
INT64 LONG
TIMESTAMP LONG
DATE INT
DATETIME STRING
TIME LONG
STRUCT RECORD
REPEATED FIELD ARRAY

Ver resultados no BigQuery

Para testar o pipeline, inicie o script gen.py. Esse script simula a geração de eventos de pedido e os envia para o tópico do Pub/Sub.

  1. No Cloud Shell, mude para o diretório de script do gerador de eventos de amostra e execute o script:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. No Console do Cloud, acesse o BigQuery.

    Ir para o BigQuery

  3. Para visualizar o esquema da tabela, clique no conjunto de dados sales e selecione a tabela orders. Se você modificar as variáveis de ambiente padrão em env.sh, os nomes do conjunto de dados e da tabela serão diferentes.

    Esquema da tabela &quot;pedidos&quot;.

  4. Para visualizar alguns dados de amostra, execute uma consulta no Editor de consultas:

    SELECT * FROM sales.orders LIMIT 5
    

    Resultado da consulta de dados de amostra.

    O esquema da tabela do BigQuery é gerado automaticamente a partir dos registros Avro, e os dados são convertidos automaticamente na estrutura da tabela do BigQuery.

Limpeza

Exclua o projeto

  1. No Console do Cloud, acesse a página Gerenciar recursos:

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Excluir recursos individuais

  1. Siga estas instruções para interromper o job do Dataflow.

  2. Exclua o bucket do Cloud Storage:

    gsutil rm -r gs://$MY_BUCKET
    

A seguir