Como executar ETL de um banco de dados relacional para o BigQuery usando o Dataflow

Neste tutorial, demonstramos como usar o Dataflow para extrair, transformar e carregar (ETL) dados de um banco de dados relacional de processamento de transações on-line (OLTP, na sigla em inglês) no BigQuery para análise.

Ele se destina a administradores de bancos de dados, profissionais de operações e arquitetos de nuvem interessados em aproveitar os recursos de consulta analítica do BigQuery e de processamento em lote do Dataflow.

Os bancos de dados OLTP geralmente são relacionais. Eles armazenam informações e processam transações para sites de comércio eletrônico, aplicativos de Software as a Service (SaaS) ou jogos. Normalmente, esses bancos de dados são otimizados para transações que exigem as propriedades ACID: atomicidade, consistência, isolamento e durabilidade. Além disso, eles geralmente têm esquemas altamente normalizados. Por outro lado, os armazenamentos de dados costumam ser otimizados para recuperação e análise de dados em vez de transações, e normalmente têm esquemas desnormalizados. Geralmente, a desnormalização dos dados de um banco OLTP o torna mais útil para a análise no BigQuery.

Objetivos

Neste tutorial, você verá duas abordagens para converter dados de RDBMS normalizados por ETL em dados do BigQuery desnormalizados:

  • Usar o BigQuery para carregar e transformar os dados. Use essa abordagem para carregar um pequeno volume de dados uma única vez no BigQuery para análise. Também é possível usar essa abordagem para criar o protótipo do conjunto de dados antes de automatizar conjuntos maiores ou vários conjuntos.
  • Usar o Dataflow para carregar, transformar e limpar os dados. Essa abordagem é ideal para carregar um volume maior de dados, dados de várias origens ou carregar dados de maneira incremental ou automática.

Custos

Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Para mais informações, consulte Limpeza.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs Compute Engine e Dataflow.

    Ative as APIs

  5. Instale e inicialize o SDK do Cloud..
  6. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  7. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  8. Ative as APIs Compute Engine e Dataflow.

    Ative as APIs

  9. Instale e inicialize o SDK do Cloud..

Como usar o conjunto de dados MusicBrainz

Este tutorial baseia-se nos snapshots JSON das tabelas do banco de dados do MusicBrainz. O banco foi criado no PostgreSQL e contém informações sobre todas as músicas do MusicBrainz. Estes são alguns elementos do esquema do MusicBrainz (em inglês):

  • Artistas
  • Grupos de lançamentos
  • Lançamentos
  • Gravações
  • Obras
  • Gravadoras
  • Várias relações (em inglês) entre essas entidades

O esquema do MusicBrainz inclui três tabelas relevantes: artist, recording e artist_credit_name. Um artist_credit representa o crédito dado ao artista por uma gravação e as linhas artist_credit_name vinculam a gravação ao artista correspondente pelo valor artist_credit.

Neste tutorial, as tabelas do PostgreSQL já foram extraídas no formato JSON. Para executar essa etapa, use o seguinte código de amostra:

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) to
exported_artist.json"
psql -w -h $host -U $user -d $db -c $pg_cmd
sed -i -e 's/\\\\/\\/g' exported_artist.json # clean up extra '\' characters

Abordagem 1: ETL com o BigQuery

Use essa abordagem para carregar um pequeno volume de dados uma única vez no BigQuery para análise. Ela também pode ser usada para criar o protótipo do conjunto de dados antes de usar a automatização em grandes ou vários conjuntos.

Crie um conjunto de dados do BigQuery

Para criar um conjunto de dados do BigQuery, carregue as tabelas do MusicBrainz individualmente e combine-as para que cada linha contenha o vínculo de dados desejado. Armazene os resultados dessa junção em uma nova tabela do BigQuery. Em seguida, exclua as tabelas originais que você carregou.

  1. No Console do Cloud, abra o BigQuery.

    ABRIR O BIGQUERY

  2. Em Recursos, clique no nome do projeto.

  3. Na barra de navegação à esquerda, clique em + Adicionar dados.

  4. Na caixa de diálogo Criar conjunto de dados, siga estas etapas:

    1. No campo ID do conjunto de dados, insira musicbrainz.
    2. Deixe o Local dos dados como Padrão.
  5. Clique em Criar conjunto de dados.

Importar tabelas do MusicBrainz

Para cada tabela do MusicBrainz, siga as etapas a seguir para adicionar uma tabela ao conjunto de dados criado:

  1. No Console do Cloud, clique no nome do conjunto de dados e em + Criar tabela.
  2. Na caixa de diálogo Criar tabela, conclua as etapas a seguir e clique em Criar tabela:

    1. Na lista suspensa Criar tabela de, em Origem, selecione Google Cloud Storage.
    2. No campo Selecionar arquivo do bucket do Cloud Storage, insira o URL do arquivo de dados, gs://solutions-public-assets/bqetl/artist.json.
    3. Em Formato de arquivo, selecione JSON (delimitado por nova linha).
    4. Em Nome da tabela, insira o nome da tabela, artist.
    5. Em Tipo de tabela, deixe Tabela nativa selecionada.
    6. Abaixo da seção Esquema, clique na opção Editar como texto para ativá-la.
    7. Faça o download do arquivo de esquema artist.
    8. Substitua o conteúdo da seção Esquema pelo conteúdo do arquivo de esquema baixado.

    Caixa de diálogo "Criar tabela" com esquema atualizado do arquivo JSON baixado.

  3. Aguarde alguns instantes até que o job de carregamento seja concluído. Para monitorá-lo, clique em Histórico de jobs.

    Após a conclusão, a nova tabela aparecerá no conjunto de dados.

  4. Repita as etapas 1 a 3 para a tabela artist_credit_name com as seguintes alterações:

  5. Repita as etapas 1 a 3 para a tabela recording com as seguintes alterações:

Desnormalizar manualmente os dados

Para desnormalizar os dados, mescle-os em um nova tabela do BigQuery que tenha uma linha para cada gravação de artista, junto com os metadados selecionados que você quer reter para análise.

  1. No Console do Cloud, copie a seguinte consulta e cole-a no Editor de consultas:

    SELECT artist.id, artist.gid as artist_gid,
           artist.name as artist_name, artist.area,
           recording.name as recording_name, recording.length,
           recording.gid as recording_gid, recording.video
      FROM `[PROJECT_ID].[DATASET].artist` as artist
          INNER JOIN `[PROJECT_ID].[DATASET].artist_credit_name` AS artist_credit_name
               ON artist.id = artist_credit_name.artist
          INNER JOIN `[PROJECT_ID].[DATASET].recording` AS recording
               ON artist_credit_name.artist_credit = recording.artist_credit
    

    Substitua [DATASET] pelo nome do conjunto de dados criado anteriormente, por exemplo, musicbrainz, e [PROJECT_ID] pelo ID do projeto do Google Cloud.

  2. Clique na lista suspensa Mais e selecione Configurações de consulta.

  3. No card Configurações de consulta, faça o seguinte:

    1. Marque a caixa de seleção Definir uma tabela de destino para os resultados da consulta.
    2. Em Nome da tabela, insira recordings_by_artists_manual.
    3. Em Preferência de gravação na tabela de destino, clique em Substituir tabela.
    4. Marque a caixa de seleção Permitir resultados extensos (sem limite de tamanho).
    5. Em Prioridade do job, mantenha o padrão Interativo.
    6. Em Dialeto SQL mantenha o Padrão.
    7. Clique em Salvar.
  4. Clique em Executar.

    Quando a consulta é concluída, os dados do resultado são organizados por músicas de cada artista na tabela do BigQuery recém-criada.

    Configurações de consulta da tabela de destino.

Abordagem 2: ETL no BigQuery com o Dataflow

Nesta seção do tutorial, em vez de usar a IU do BigQuery, você usa um programa de exemplo para carregar dados no BigQuery usando um pipeline do Dataflow. Em seguida, use o modelo de programação do Dataflow para desnormalizar e limpar dados a serem carregados no BigQuery.

Antes de começar, consulte os conceitos e o código de exemplo.

Consultar os conceitos

Os dados são pequenos e podem ser enviados rapidamente usando a IU do BigQuery, porém, para os fins deste tutorial, também é possível usar o Dataflow para ETL. Use o Dataflow para ETL no BigQuery em vez da IU do BigQuery quando estiver realizando mesclagens em massa, ou seja, de cerca de 500 a 5.000 colunas de mais de 10 TB de dados, com as seguintes metas:

  • É recomendável limpar ou transformar os dados conforme eles são carregados no BigQuery, em vez de armazená-los e mesclá-los posteriormente. Essa abordagem tem requisitos de armazenamento mais baixos, porque os dados só são armazenados no BigQuery no estado mesclado e transformado.
  • Você planeja fazer uma limpeza de dados personalizada, que não é possível simplesmente com o SQL.
  • Você planeja combinar esses dados com outros fora do OLTP, como registros ou dados acessados remotamente, durante o processo de carregamento.
  • Você planeja automatizar o teste e a implantação da lógica do carregamento de dados usando integração ou implantação contínuas (CI/CD, na sigla em inglês).
  • Você prevê uma iteração gradual, melhorias e aprimoramentos no processo de ETL ao longo do tempo.
  • Você planeja adicionar dados incrementalmente, em vez de executar o ETL apenas uma vez.

Veja um diagrama do pipeline de dados criado pelo programa de exemplo:

Pipeline de dados usando o BigQuery.

No código de exemplo, várias etapas do pipeline são agrupadas ou encapsuladas em métodos de conveniência, nomeadas com nomes descritivos e reutilizadas. No diagrama, as etapas reutilizadas estão indicadas pelo contorno tracejado.

Revisar o código do pipeline

O código cria um pipeline que executa as etapas a seguir:

  1. Carrega cada tabela que você quer incluir na mesclagem em um PCollection de strings. Cada elemento compõe a representação JSON de uma linha da tabela.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Converte essas strings JSON em representações de objeto, objetos MusicBrainzDataObject e, em seguida, organiza as representações de objeto por um dos valores de coluna, como uma chave primária ou externa.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply("load " + name,
                        MapElements
                          .into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
                          .via( (String input) -> {
                                MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                                Long key = (Long) datum.getColumnValue(namespacedKeyname);
                                return KV.of(key, datum);
                                })
             );
    }
  3. A lista é mesclada com base no artista em comum. O artist_credit_name vincula um crédito de artista à gravação correspondente e inclui a chave estrangeira do artista. A tabela artist_credit_name é carregada como uma lista de objetos KV de valor-chave. O membro K é o artista.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. Mescla a lista usando o método MusicBrainzTransforms.innerJoin().

    public static PCollection<MusicBrainzDataObject>
                         innerJoin(String name,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table1,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>(){};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>(){};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Agrupa as coleções de objetos KV pelo membro chave pelo qual será feita a mesclagem. Isso gera um PCollection de objetos KV com uma chave longa (o valor da coluna artist.id) e o CoGbkResult resultante (que significa combinar o grupo por resultado de chave). O objeto CoGbkResult é uma tupla de listas de objetos com o valor de chave em comum no primeiro e no segundo PCollections. Essa tupla pode ser endereçada usando a tag de tupla formulada para cada PCollection antes de executar a operação CoGroupByKey no método group.
    2. Mescla cada correspondência de objetos em um objeto MusicBrainzDataObject que representa um resultado de mesclagem.

          PCollection<List<MusicBrainzDataObject>> mergedResult =
              joinedResult.apply("merge join results",
                           MapElements
                         .into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                         .via( ( KV<Long, CoGbkResult> group ) -> {
                             List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>();
                             Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                             Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                             leftObjects.forEach((MusicBrainzDataObject l) -> {
                               rightObjects.forEach((MusicBrainzDataObject r) -> {
                                 result.add(l.duplicate().merge(r));
                               });
                             });
                             return result;
                           }
                         )
      );
    3. A coleção é reorganizada em uma lista de objetos KV para iniciar a próxima mesclagem. Desta vez, o valor K é a coluna artist_credit, que é usada na mesclagem com a tabela de gravação.

      PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit =  MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. A coleção final de objetos MusicBrainzDataObject provém da mesclagem desse resultado com a coleção de gravações carregada que está organizada por artist_credit.id.

      PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings",
         artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. Mapeia os objetos MusicBrainzDataObjects resultantes em TableRows.

      PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. Grava o TableRows resultante no BigQuery.

      tableRows.apply(
           "Write to BigQuery",
           BigQueryIO.writeTableRows()
          .to(BQETLOptions.getBigQueryTablename())
          .withSchema(bqTableSchema)
          .withCustomGcsTempLocation(StaticValueProvider.of(BQETLOptions.getTempLocation() ))
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Para detalhes sobre a dinâmica da programação de pipeline do Dataflow, consulte os seguintes tópicos sobre o modelo de programação:

Depois de revisar as etapas que o código executa, execute o pipeline.

Executar o código do pipeline

  1. No Console do Cloud, abra o Cloud Shell.

    Abra o Cloud Shell

  2. Defina as variáveis de ambiente do projeto:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    

    Substitua [PROJECT_ID] pelo ID do projeto do Google Cloud e [CHOOSE_AN_APPROPRIATE_ZONE] por uma zona do Google Cloud.

  3. Defina as variáveis de ambiente usadas pelo script do pipeline:

    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export DATASET=musicbrainz
    export SERVICE_ACCOUNT=project-owner
    
  4. Verifique se gcloud está usando o projeto que você criou ou selecionou no início do tutorial:

    gcloud config set project $PROJECT_ID
    
  5. Crie uma conta de serviço para executar o pipeline:

    gcloud iam service-accounts create ${SERVICE_ACCOUNT} \
        --display-name "Project Owner Account"
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
        --role roles/owner
    gcloud iam service-accounts keys create \
        ~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json \
        --iam-account ${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com
    

    Com esse comando, é feito o download de um arquivo JSON que contém a chave da sua conta de serviço. Armazene esse arquivo em local seguro.

  6. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS com o caminho do arquivo JSON que contém a chave da conta de serviço:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  7. Clone o repositório que contém o código do Dataflow:

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  8. Mude o diretório para o exemplo:

    cd bigquery-etl-dataflow-sample
    
  9. Crie um bucket de organização no Cloud Storage, já que os jobs do Dataflow exigem um bucket no Cloud Storage para organizar os arquivos binários usados para executar o pipeline.

    gsutil mb gs://$STAGING_BUCKET
    
  10. Defina o ciclo de vida do objeto para o [STAGING_BUCKET_NAME] como aquele no arquivo dataflow-staging-policy.json:

    gsutil lifecycle set dataflow-staging-policy.json gs://$STAGING_BUCKET
    
  11. Execute o job do Dataflow:

    ./run.sh simple
    
  12. Para ver o progresso do pipeline, no Console do Cloud, acesse a página do Dataflow.

    Acessar a página do Dataflow

    O status dos jobs é mostrado na coluna de status. O status Concluído indica que o job foi concluído.

  13. (Opcional) Para ver o gráfico do job e os detalhes sobre as etapas, clique no nome do job, por exemplo, etl-into-bigquery-bqetlsimple.

  14. No Console do Cloud, acesse a página do BigQuery.

    Acessar a página do BigQuery

    Verifique se o projeto do Google Cloud está selecionado.

  15. Para executar uma consulta na nova tabela, no painel Editor de consultas, insira o seguinte:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Editor de consultas atualizado com consulta de nova tabela.

Limpar os dados

Em seguida, faça uma pequena alteração no pipeline do Dataflow para carregar tabelas de consulta e processá-las como entradas secundárias, conforme mostrado no diagrama a seguir.

Pipeline do Dataflow atualizado para entradas secundárias.

Quando você consulta a tabela resultante do BigQuery, é difícil supor de onde provém o artista sem procurar manualmente o ID numérico da área da tabela area no banco de dados MusicBrainz. Com isso, a análise dos resultados da consulta fica menos direta do que poderia ser.

De modo semelhante, os gêneros dos artistas aparecem como IDs, mas a tabela de gêneros do MusicBrainz só tem três linhas. Para corrigir isso, adicione uma etapa no pipeline do Dataflow que use as tabelas area e gender do MusicBrainz para mapear os IDs para os devidos rótulos.

As tabelas artist_area e artist_gender contêm um número significativamente menor de linhas do que artistas ou tabelas de dados de gravação. O número de elementos nas tabelas posteriores é limitado pelo número de áreas geográficas ou de gêneros, respectivamente.

Consequentemente, a etapa de pesquisa usa o recurso do Dataflow denominado entrada secundária (em inglês).

As entradas secundárias são carregadas como exportações JSON de tabela limitada por linha e usadas para desnormalizar os dados da tabela em uma única etapa.

Revisar o código que adiciona entradas secundárias ao pipeline

Antes de executar o pipeline, revise o código para entender melhor as novas etapas.

No arquivo BQETLSimple.java, revise as linhas de comentário. As marcas de comentário serão removidas em uma etapa posterior.

//PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
//        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
//        MusicBrainzTransforms.lookup("gender","id","name","gender"));

PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

Este código demonstra da limpeza dos dados com entradas secundárias. A classe MusicBrainzTransforms oferece mais conveniência no uso de entradas secundárias para mapear valores de chave estrangeira para rótulos. A biblioteca MusicBrainzTransforms fornece um método que cria uma classe de pesquisa interna. A classe de pesquisa descreve cada tabela de consulta e os campos que são substituídos por rótulos e argumentos de comprimento variável. keyKey é o nome da coluna que contém a chave para a pesquisa e valueKey é o nome da coluna que contém o rótulo correspondente.

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

Cada entrada secundária é carregada como um único objeto de mapa, que é usado para consultar o rótulo correspondente a um ID.

Primeiro, o JSON da tabela de consulta é carregado em MusicBrainzDataObjects com namespace vazio e transformado em mapa, do valor da coluna Key para o valor da coluna Value.

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries = text.apply(
        "sideInput_" + name,
        MapElements
          .into(new TypeDescriptor<KV<Long, String>>() {})
          .via((String input) -> {
                 MusicBrainzDataObject object = JSONReader.readObject(name, input);
                 Long key = (Long) object.getColumnValue(keyKeyName);

                 String value = (String) object.getColumnValue(valueKeyName);
                 return KV.of(key, value);
               })
        );

  return entries.apply(View.<Long, String>asMap());
}

Cada um desses objetos Map é colocado em um Map pelo valor de destinationKey, que é a chave a ser substituída pelos valores pesquisados.

List<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>> mapSideInputs = new ArrayList<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
                            .map( destinationKey -> name + "_" + destinationKey )
                            .collect(Collectors.toList());

    mapSideInputs.add(new SimpleEntry(destKeyList, mapView));

}

Em seguida, ao transformar os objetos de artista pelo JSON, o valor de destinationKey (que começa como um número) é substituído pelo rótulo.

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach( ( String key ) -> {
  Long id = (Long) result.getColumnValue(key);
  if (id != null) {
    String label = (String) sideInputMap.get(id);
    if (label == null) {
      label = "" + id;
    }
    result.replace(key, label);

Se quiser modificar o BQETLSimple.java para usar pesquisas visando decodificar os dados dos campos artist_area e artist_gender, siga estas etapas:

  1. Altere ligeiramente o fluxo do programa:

    1. Remova as marcas de comentário das linhas que carregam os dados do artista usando as consultas.
    2. Transforme em comentário a chamada a loadTable que carrega os dados do artista sem as consultas.
    //PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
    //        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
    //        MusicBrainzTransforms.lookup("gender","id","name","gender"));
    
    PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");
  2. Altere o TableFieldSchemas de artist_area e artist_gender para ser do tipo de dados string em vez de int, transformando em comentário os campos int correspondentes e removendo as marcas de comentário dos campos string correspondentes.

    /*Switch these two lines when using mapping table for artist_area */
    //        .stringField("artist_area")
            .intField("artist_area")
    /*Switch these two lines when using mapping table for artist_gender */
    //        .stringField("artist_gender")
            .intField("artist_gender")
    /*Switch these two lines when using mapping table for artist_begin_area */
            .intField("artist_begin_area")
    //      .stringField("artist_begin_area")
  3. Para executar novamente o código do pipeline, siga estas etapas:

    1. Defina as variáveis de ambiente do projeto:

      export PROJECT_ID=[PROJECT_ID]
      export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
      
    2. Verifique se o ambiente está configurado:

      export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
      export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
      export DATASET=musicbrainz
      export SERVICE_ACCOUNT=project-owner
      
    3. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço.

      export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
      
    4. Execute o pipeline para aninhar as linhas de gravação dentro das linhas de artista:

      ./run.sh simple
      
  4. Execute a mesma consulta que inclui artist_area e artist_gender:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Na saída, artist_area e artist_gender agora são decodificados:

    Saída decodificada por &quot;artist_area&quot; e &quot;artist_gender&quot;.

Otimizar o esquema do BigQuery

Na parte final deste tutorial, você executará um pipeline que gera um esquema de tabela mais otimizado usando campos aninhados.

Revise o código usado para gerar essa versão otimizada da tabela.

No diagrama a seguir, veja um pipeline do Dataflow um pouco diferente. Em vez de linhas duplicadas, você tem as gravações de artistas aninhadas dentro de cada linha de artista.

Pipeline do Dataflow com gravações de artista aninhadas dentro das linhas de cada artista.

A representação atual dos dados é bastante simples. Isso significa que ela inclui uma linha por gravação creditada que, por sua vez, inclui todos os metadados do artista do esquema do BigQuery e todas as gravações e metadados de artist_credit_name. Essa representação simples tem pelo menos duas desvantagens:

  • Os metadados de artist são repetidos para cada gravação creditada a um artista, o que aumenta o armazenamento necessário.
  • Ao exportar os dados como JSON, você terá uma matriz com dados repetidos, em vez de um artista com os dados de gravação aninhados, o que você provavelmente quer.

Sem prejudicar o desempenho e sem usar armazenamento extra, é possível armazenar as gravações como um campo repetido em cada registro de artista, em vez de armazenar uma gravação por linha. Bastam algumas alterações no pipeline do Dataflow.

Em vez de mesclar as gravações com as informações do artista por artist_credit_name.artist, neste pipeline alternativo, criamos uma lista aninhada de gravações dentro de um objeto de artista.

public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent,
                                                      PCollection<KV<Long, MusicBrainzDataObject>> child,
                                                      String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>(){};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>(){};

  PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply("merge join results " + nestingKey,
                            MapElements
                             .into(new TypeDescriptor<MusicBrainzDataObject>() {})
                             .via((KV<Long, CoGbkResult> group) -> {
                                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                                List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>();
                                children.forEach(childList::add);
                                parentObject = parentObject.duplicate();
                                parentObject.addColumnValue("recordings", childList);
                                return parentObject;
                              })
                         );
}

TableRow tem limites de tamanho na API BigQuery, portanto, o código limita o número de gravações aninhadas de um determinado registro a 1.000 elementos. Se um determinado artista tiver mais de 1.000 gravações, o código duplicará a linha, incluindo os metadados de artist, e continuará aninhando os dados de gravação na linha duplicada.

private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<TableRow>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }

    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> {
      if (nestedList.size() > 0) {
        if (parentRow.get(key) == null) {
          parentRow.set(key, new ArrayList<TableRow>());
        }
        List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
        childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key)));
      }
    });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

No diagrama, veja as origens, transformações e coletores do pipeline.

Pipeline otimizado com origens, transformações e coletores.

Na maioria dos casos, os nomes das etapas são fornecidos no código como parte da chamada de método apply.

Para criar esse pipeline otimizado, siga estas etapas:

  1. No Cloud Shell, verifique se o ambiente está configurado para o script do pipeline:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export SERVICE_ACCOUNT=project-owner
    
  2. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  3. Execute o pipeline para aninhar as linhas de gravação dentro das linhas de artista:

    ./run.sh nested
    
  4. Campos de consulta da tabela aninhada no BigQuery:

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    Resultados da consulta da tabela aninhada.

  5. Execute uma consulta para extrair valores de STRUCT e usá-los para filtrar os resultados:

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    Consulta para filtrar os resultados.

Limpeza

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Exclua o projeto

  1. No Console do Cloud, acesse a página Gerenciar recursos:

    Acessar "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

Como excluir recursos individuais

Siga estas etapas para remover os recursos individualmente, em vez de excluir o projeto inteiro.

Como excluir o bucket do Cloud Storage

  1. No Console do Cloud, acesse a página Navegador do Cloud Storage.

    Acessar o navegador

  2. Clique na caixa de seleção do bucket que você quer excluir.
  3. Para excluir o bucket, clique em Excluir e siga as instruções.

Como excluir os conjuntos de dados do BigQuery

  1. Abra a IU da Web do BigQuery.

    Abrir o BIGQUERY

  2. Selecione os conjuntos de dados do BigQuery criados no tutorial.

  3. Clique em Excluir .

A seguir