Como unir dados de streaming com o Dataflow SQL

Neste tutorial, você verá como usar o Dataflow SQL para unir um fluxo de dados do Pub/Sub com dados de uma tabela do BigQuery.

Objetivos

Neste tutorial, você aprenderá a:

  • gravar uma consulta SQL do Dataflow que mescle dados de streaming do Pub/Sub com dados da tabela do BigQuery;
  • implantar um job do Dataflow com base na IU do Dataflow SQL.

Custos

Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON do Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager e Data Catalog. .

    Ative as APIs

  5. Crie uma conta de serviço:

    1. No Console do Cloud, acesse a página Criar conta de serviço.

      Acesse Criar conta de serviço
    2. Selecione um projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar e continuar.
    5. Clique no campo Selecionar um papel.

      Em Acesso rápido, clique em Básico e em Proprietário.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

  6. Crie uma chave de conta de serviço:

    1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  7. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  8. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  9. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  10. Ative as APIs Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON do Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager e Data Catalog. .

    Ative as APIs

  11. Crie uma conta de serviço:

    1. No Console do Cloud, acesse a página Criar conta de serviço.

      Acesse Criar conta de serviço
    2. Selecione um projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar e continuar.
    5. Clique no campo Selecionar um papel.

      Em Acesso rápido, clique em Básico e em Proprietário.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

  12. Crie uma chave de conta de serviço:

    1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  13. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  14. Instale e inicialize o Cloud SDK. Escolha uma das opções de instalação. Talvez seja necessário definir a propriedade project como o projeto que você está usando neste tutorial.
  15. Acesse a IU da Web do SQL do Dataflow no Console do Cloud. Isso abre seu projeto acessado mais recentemente. Se quiser alternar para um projeto diferente, clique no nome do projeto na parte superior da IU da Web do BigQuery e pesquise aquele que você quer usar.
    Acessar a IU da Web do Dataflow SQL

Criar exemplos de fonte

Se você quiser seguir o exemplo fornecido neste tutorial, crie as fontes a seguir e use-as nas etapas abaixo.

  • Um tópico do Pub/Sub chamado transactions: um stream de dados de transação que chega por meio de uma assinatura no tópico do Pub/Sub. Os dados para cada transação incluem informações como produto comprado, preço promocional e a cidade em que ocorreu a compra. Depois de criar o tópico do Pub/Sub, crie um script que publique mensagens no seu tópico. Esse script será executado em uma seção futura deste tutorial.
  • Uma tabela do BigQuery chamada us_state_salesregions: uma tabela que fornece um mapeamento de estados para regiões de vendas. Antes de criar esta tabela, é preciso criar um conjunto de dados do BigQuery.

Atribuir um esquema ao seu tópico do Pub/Sub

A atribuição de um esquema permite executar consultas SQL dos dados do seu tópico do Pub/Sub. Atualmente, o Dataflow SQL espera que as mensagens nos tópicos do Pub/Sub sejam serializadas no formato JSON.

Para atribuir um esquema ao tópico de exemplo do Pub/Sub transactions:

  1. Crie um arquivo de texto e nomeie-o como transactions_schema.yaml. Copie e cole o texto de esquema a seguir em transactions_schema.yaml.
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: tr_time_str
    description: Transaction time string
    mode: NULLABLE
    type: STRING
  - column: first_name
    description: First name
    mode: NULLABLE
    type: STRING
  - column: last_name
    description: Last name
    mode: NULLABLE
    type: STRING
  - column: city
    description: City
    mode: NULLABLE
    type: STRING
  - column: state
    description: State
    mode: NULLABLE
    type: STRING
  - column: product
    description: Product
    mode: NULLABLE
    type: STRING
  - column: amount
    description: Amount of transaction
    mode: NULLABLE
    type: FLOAT
  1. Atribua o esquema usando a ferramenta de linha de comando gcloud.

    a. Atualize a ferramenta gcloud com o comando a seguir. Verifique se a versão da ferramenta gcloud é a 242.0.0 ou superior.

      gcloud components update
    

    b. Execute o comando a seguir em uma janela de linha de comando. Substitua project-id pelo ID do projeto e path-to-file pelo caminho do arquivo transactions_schema.yaml.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Para mais informações sobre parâmetros de comando e formatos de arquivo de esquema permitidos, consulte a página de documentação de atualização de entradas de catálogo de dados do gcloud.

    c. Confirme se o esquema foi atribuído com êxito ao tópico transactions do Pub/Sub. Substitua project-id pelo ID do projeto.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Encontrar fontes do Pub/Sub

A IU do SQL do Dataflow oferece uma forma de encontrar fontes de dados do Pub/Sub de qualquer projeto a que você tenha acesso, para que não precise lembrar os nomes completos deles.

Para o exemplo neste tutorial, pesquise o tópico transactions do Pub/Sub que você criou:

  1. No painel esquerdo, pesquise projectid=project-id transactions. Substitua project-id pelo ID do projeto.

    Painel de pesquisa do Data Catalog no espaço de trabalho do SQL do Dataflow.

Ver o esquema

  1. No painel "Explorador" à esquerda da IU do SQL do Dataflow, clique em transações ou pesquise um tópico do Pub/Sub digitando projectid=project-id system=cloud_pubsub e selecione o tópico.
  2. Em Esquema, é possível visualizar o esquema atribuído ao tópico do Pub/Sub.

    Esquema atribuído ao tópico, incluindo uma lista de nomes de campos e suas descrições.

Criar uma consulta SQL

A IU do Dataflow SQL permite criar consultas SQL para executar jobs do Dataflow.

A consulta SQL a seguir é de enriquecimento de dados. Ele acrescenta mais um campo, sales_region, ao stream de eventos do Pub/Sub (transactions), usando uma tabela do BigQuery (us_state_salesregions) que mapeia estados nas regiões de vendas.

Copie e cole a consulta SQL abaixo no Editor de consultas. Substitua project-id pelo ID do projeto:

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Ao inserir uma consulta na IU do Dataflow SQL, o validador da consulta verifica a sintaxe dela. Se a consulta for válida, será exibido um ícone de marca de seleção verde. Se a consulta for inválida, será exibido um ícone de ponto de exclamação vermelho. Se a sintaxe for inválida, clicar no ícone do validador fornecerá informações sobre o que é necessário corrigir.

A captura de tela a seguir mostra a consulta válida no Editor de consulta. O validador exibe uma marca de seleção verde.

Espaço de trabalho do SQL do Dataflow com a consulta do tutorial visível no editor.

Criar um job do Dataflow para executar sua consulta SQL

Para executar sua consulta SQL, crie um job do Dataflow na IU do Dataflow SQL.

  1. Acima do Editor de consultas, clique em Criar job.

  2. No painel Criar job do Dataflow que se abre à direita, na opção Destino, selecione BigQuery. Em seguida, em ID do conjunto de dados, selecione dataflow_sql_tutorial e defina o Nome da tabela como sales.

    Formulário "Criar o job do SQL do Dataflow".
  3. (Opcional) O Dataflow escolhe automaticamente as configurações ideais para seu job do Dataflow SQL, mas é possível expandir o menu dos Parâmetros opcionais para especificar manualmente as seguintes opções de pipeline:

    • Número máximo de workers
    • Zona
    • E-mail da conta de serviço
    • Machine type
    • Outros experimentos
    • Configuração do endereço IP do worker
    • Rede
    • Sub-rede
  4. Clique em Criar Levará alguns minutos para que o job do Dataflow comece a ser executado.

Ver o job do Dataflow

O Dataflow transforma sua consulta SQL em um pipeline do Apache Beam. Na IU da Web do Dataflow que é aberta em uma nova guia do navegador, é possível ver uma representação gráfica do pipeline.

Pipeline da consulta SQL exibida na IU da Web do Dataflow.

Clique nas caixas para ver um detalhamento das transformações que estão ocorrendo no pipeline. Por exemplo, se você clicar na caixa superior na representação gráfica, com o rótulo Executar consulta SQL, aparecerá um gráfico que mostra as operações em segundo plano.

As duas primeiras caixas representam as duas entradas que você mesclou: o tópico do Pub/Sub, transactions e a tabela do BigQuery, us_state_salesregions.

A saída de gravação de uma junção de duas entradas será concluída em 25 segundos.

Para visualizar a tabela de saída que contém os resultados do job, acesse a IU do BigQuery. No painel Explorador à esquerda no seu projeto e clique no conjunto de dados dataflow_sql_tutorial que você criou. Depois, clique na tabela de saída, sales. A guia Visualização exibe o conteúdo da tabela de saída.

A tabela de visualização contém colunas para tr_time_str, first_name, last_name, city, state, product, amount e sales_region.

Ver jobs passados e editar suas consultas

A IU do Dataflow armazena jobs e consultas anteriores na página Jobs.

É possível usar a lista do histórico de jobs para ver consultas SQL anteriores. Por exemplo, você quer modificar sua consulta para agregar vendas por região de vendas a cada 15 segundos. Use a página Jobs para acessar o job em execução que você iniciou anteriormente no tutorial, copie a consulta SQL e execute outro job com uma consulta modificada.

  1. Na página Jobs do Dataflow, clique no job que você quer editar.

  2. Na página Detalhes do job, localize a consulta SQL no painel à direita em Opções de pipeline. Encontre a linha de queryString.

    A opção do pipeline de job chamada queryString.
  3. Copie e cole a consulta SQL no Editor de consultas para adicionar janelas em cascata. Se você copiar a consulta a seguir, substitua project-id pelo ID do projeto.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Clique em Criar job para criar um novo job com a consulta modificada.

Limpeza

Para evitar cobranças gerados pelos recursos usados neste tutorial na conta de faturamento do Cloud:

  1. Interrompa o script de publicação transactions_injector.py se ele ainda estiver em execução.

  2. Interrompa os jobs do Dataflow em execução. Acesse a IU da Web do Dataflow no Console do Cloud.

    Acessar a IU da Web do Dataflow

    Em cada job criado seguindo este tutorial, siga estas etapas:

    1. Clique no nome do job.

    2. No painel Resumo do job, clique em Interromper job. A caixa de diálogo Interromper job será exibida, oferecendo opções para essa ação.

    3. Clique em Cancelar.

    4. Clique em Interromper job. O serviço suspende toda a ingestão de dados e processamento assim que possível. Como Cancelar interrompe imediatamente o processamento, talvez todos os dados "em trânsito" sejam perdidos. Interromper um job pode levar alguns minutos.

  3. Exclua seu conjunto de dados do BigQuery. Acesse a IU da Web do BigQuery no Console do Cloud.

    Acessar a IU da Web do BigQuery

    1. No painel Explorador, na seção Recursos, clique no conjunto de dados dataflow_sql_tutorial criado.

    2. No painel de detalhes, à direita, clique em Excluir conjunto de dados. Esta ação exclui o conjunto de dados, a tabela e todos os dados.

    3. Na caixa de diálogo Excluir conjunto de dados, confirme o comando de exclusão digitando o nome do seu conjunto de dados (dataflow_sql_tutorial) e clique em Excluir.

  4. Exclua seu tópico do Pub/Sub. Acesse a página de tópicos do Pub/Sub no Console do Cloud.

    Acesse a página de tópicos do Cloud Pub/Sub

    1. Marque a caixa de seleção ao lado do tópico transactions.

    2. Clique em Excluir para excluir permanentemente o tópico.

    3. Acesse a página de assinaturas do Pub/Sub.

    4. Marque a caixa de seleção ao lado das assinaturas restantes em transactions. Se seus jobs não estiverem mais em execução, talvez não haja mais nenhuma assinatura.

    5. Clique em Excluir para excluir permanentemente as assinaturas.

  5. Exclua o intervalo de preparo do Dataflow no Cloud Storage. Acesse o navegador do Cloud Storage no Console do Cloud.

    Acessar o navegador do Cloud Storage

    1. Marque a caixa de seleção ao lado do intervalo de preparo do Dataflow.

    2. Clique em Excluir para excluir permanentemente o bucket.

A seguir