Como unir dados de streaming com o Dataflow SQL

Neste tutorial, você verá como usar o Dataflow SQL para unir um fluxo de dados do Pub/Sub com dados de uma tabela do BigQuery.

Objetivos

Neste tutorial, você aprenderá a:

  • gravar uma consulta SQL do Dataflow que mescle dados de streaming do Pub/Sub com dados da tabela do BigQuery;
  • implantar um job do Dataflow com base na IU do Dataflow SQL.

Custos

Neste tutorial, há componentes faturáveis do Google Cloud, como os seguintes:

  • Dataflow
  • Cloud Storage
  • Pub/Sub

Use a calculadora de preços para gerar uma estimativa de custos baseada na projeção de uso. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

  1. Faça login na sua conta do Google.

    Se você ainda não tiver uma, inscreva-se.

  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar a página do seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub e Cloud Resource Manager .

    Ative as APIs

  5. Configurar a autenticação:
    1. No Console do Cloud, acesse a página Criar chave da conta de serviço.

      Acessar página "Criar chave da conta de serviço"
    2. Na lista Conta de serviço, selecione Nova conta de serviço.
    3. No campo Nome da conta de serviço, insira um nome.
    4. Na lista Papel, selecione Projeto > Proprietário.

    5. Clique em Criar. O download de um arquivo JSON que contém sua chave é feito no seu computador.
  6. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  7. Instale e inicialize o Cloud SDK. Escolha uma das opções de instalação. Talvez seja necessário definir a propriedade project como o projeto que você está usando neste tutorial.
  8. Acesse a IU da Web do BigQuery no Console do Cloud. Isso abre seu projeto acessado mais recentemente. Para alternar para um projeto diferente, clique no nome do projeto na parte superior da IU da Web do BigQuery e pesquise aquele que você quer usar.
    Acessar a IU da Web do BigQuery

Alternar para a IU do Dataflow SQL

Na IU da Web do BigQuery, siga estas etapas para alternar para a IU do Dataflow.

  1. Clique no menu suspenso Mais e selecione Configurações de consulta.

  2. No menu Configurações de consulta exibido à direita, selecione Mecanismo do Dataflow.

  3. Se as APIs do Dataflow e do Data Catalog não estiverem ativadas no projeto, você será solicitado a fazê-lo. Clique em Ativar APIs. A ativação das APIs do Dataflow e do Data Catalog pode levar alguns minutos.

  4. Quando a ativação das APIs estiver concluída, clique em Salvar.

Criar exemplos de amostra

Se você quiser seguir o exemplo fornecido neste tutorial, crie as fontes a seguir e use-as nas etapas abaixo.

  • Um tópico do Pub/Sub chamado transactions: um stream de dados de transação que chega por meio de uma assinatura no tópico do Pub/Sub. Os dados para cada transação incluem informações como produto comprado, preço promocional e a cidade em que ocorreu a compra. Depois de criar o tópico do Pub/Sub, crie um script que publique mensagens no seu tópico. Esse script será executado em uma seção futura deste tutorial.
  • Uma tabela do BigQuery chamada us_state_salesregions: uma tabela que fornece um mapeamento de estados para regiões de vendas. Antes de criar esta tabela, é preciso criar um conjunto de dados do BigQuery.

Encontrar fontes do Pub/Sub

A IU do Dataflow SQL oferece uma maneira de encontrar fontes de dados do Pub/Sub para qualquer projeto a que você tenha acesso, para que não precise lembrar os nomes completos deles.

Para o exemplo neste tutorial, adicione o tópico transactions do Pub/Sub que você criou:

  1. No painel de navegação à esquerda, clique na lista suspensa Adicionar dados e selecione Fontes do Cloud Dataflow.

  2. No painel Adicionar origem do Cloud Dataflow que é aberto à direita, escolha Tópicos do Pub/Sub. Na caixa de pesquisa, procure transactions. Selecione o tópico e clique em Adicionar.

Atribuir um esquema ao seu tópico do Pub/Sub

A atribuição de um esquema permite executar consultas SQL dos dados do seu tópico do Pub/Sub. Atualmente, o Dataflow SQL espera que as mensagens nos tópicos do Pub/Sub sejam serializadas no formato JSON. No futuro, haverá compatibilidade com outros formatos, como Avro.

Após adicionar o exemplo do tópico do Pub/Sub como origem do Dataflow, conclua as etapas a seguir para atribuir um esquema ao tópico na IU do Dataflow SQL:

  1. Selecione o tópico no painel Recursos.

  2. Na guia Esquema, clique em Editar esquema. O painel lateral Esquema é aberto à direita.

  3. Ative o botão Editar como texto e cole o seguinte esquema inline no editor. Em seguida, clique em Enviar.

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "type": "FLOAT64"
      }
    ]
    
  4. (Opcional) Clique em Visualizar tópico para examinar o conteúdo de suas mensagens e confirmar se elas correspondem ao esquema que você definiu.

Ver o esquema

  1. No painel de navegação à esquerda da IU do Dataflow SQL, clique em Origens do Cloud Dataflow.
  2. Clique nos tópicos do Pub/Sub.
  3. Clique em Transações.
  4. Em Esquema, é possível visualizar o esquema atribuído ao tópico transactions do Pub/Sub.
Esquema atribuído ao tópico, incluindo uma lista de nomes de campos e suas descrições.

Criar uma consulta SQL

A IU do Dataflow SQL permite criar consultas SQL para executar jobs do Dataflow.

A consulta SQL a seguir é de enriquecimento de dados. Ele acrescenta mais um campo, sales_region, ao stream de eventos do Pub/Sub (transactions), usando uma tabela do BigQuery (us_state_salesregions) que mapeia estados nas regiões de vendas.

Copie e cole a consulta SQL abaixo no Editor de consultas. Substitua project-id pelo ID do projeto:

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Ao inserir uma consulta na IU do Dataflow SQL, o validador da consulta verifica a sintaxe dela. Se a consulta for válida, será exibido um ícone de marca de seleção verde. Se a consulta for inválida, será exibido um ícone de ponto de exclamação vermelho. Se a sintaxe for inválida, clicar no ícone do validador fornecerá informações sobre o que é necessário corrigir.

A captura de tela a seguir mostra a consulta válida no Editor de consulta. O validador exibe uma marca de seleção verde.

Criar um job do Dataflow para executar sua consulta SQL

Para executar sua consulta SQL, crie um job do Dataflow na IU do Dataflow SQL.

  1. Abaixo do Editor de consultas, clique em Criar job do Dataflow.

  2. No painel Criar job do Dataflow que aparece à direita, altere o Nome da tabela padrão para dfsqltable_sales.

  3. (Opcional) O Dataflow escolhe automaticamente as configurações ideais para seu job do Dataflow SQL, mas é possível expandir o menu dos Parâmetros opcionais para especificar manualmente as seguintes opções de pipeline:

    • Número máximo de workers
    • Zona
    • E-mail da conta de serviço
    • Machine type
    • Outros experimentos
    • Configuração do endereço IP do worker
    • Rede
    • Sub-rede
  4. Clique em Criar Levará alguns minutos para que o job do Dataflow comece a ser executado.

  5. O painel Resultados da consulta aparece na IU. Para voltar ao painel Resultados da consulta do job futuramente, encontre o job no painel Histórico de jobs e use o botão Abrir consulta no editor, conforme mostrado em Ver o job do Dataflow e a saída.

  6. Em Informações do job, clique no link ID do job. Isso abre uma nova guia do navegador com a página Detalhes do job do Dataflow na IU da Web do Dataflow.

Visualizar a saída e o job do Dataflow

O Dataflow transforma sua consulta SQL em um pipeline do Apache Beam. Na IU da Web do Dataflow que é aberta em uma nova guia do navegador, é possível ver uma representação gráfica do pipeline.

Gráficos de resumo e pipelines de jobs mostrando um pico simultâneo na latência do sistema de menos de 30 segundos e atraso na atualização dos dados de mais de 30 segundos.

Clique nas caixas para ver um detalhamento das transformações que estão ocorrendo no pipeline. Por exemplo, se você clicar na caixa superior na representação gráfica, com o rótulo Executar consulta SQL, aparecerá um gráfico que mostra as operações em segundo plano.

As duas primeiras caixas representam as duas entradas que você mesclou: o tópico do Pub/Sub, transactions e a tabela do BigQuery, us_state_salesregions.

A saída de gravação de uma junção de duas entradas será concluída em 25 segundos.

Para visualizar a tabela de saída que contém os resultados do job, volte para a guia do navegador com a IU do Dataflow SQL. No painel de navegação à esquerda, em seu projeto, clique no conjunto de dados dataflow_sql_dataset que você criou. Depois clique na tabela de saída, dfsqltable_sales. A guia Visualização exibe o conteúdo da tabela de saída.

A tabela de visualização dfsqltable_sales contém colunas para tr_time_str, first_name, last_name, city, state, product, amount e sales_region.

Ver jobs passados e editar suas consultas

A IU do Dataflow SQL armazena jobs e consultas anteriores no painel Histórico do job. Os jobs são listados pelo dia em que foram iniciados. A lista de jobs exibe primeiro os dias que contêm jobs em execução. Em seguida, a lista exibe dias sem nenhum job em execução.

É possível usar a lista do histórico de jobs para editar consultas SQL anteriores e executar novos jobs do Dataflow. Por exemplo, você quer modificar sua consulta para agregar vendas por região de vendas a cada 15 segundos. Use o painel Histórico do job para acessar o job em execução que você iniciou anteriormente no tutorial, altere a consulta SQL e execute outro job com a consulta modificada.

  1. No painel de navegação à esquerda, clique em Histórico do job.

  2. Em Histórico do job, clique em Cloud Dataflow. Todos os jobs antigos do seu projeto aparecem.

    Histórico de jobs listado com data e hora em que o job foi executado e um ícone de status sobre o job.
  3. Clique no job que você quer editar. Clique em Abrir no editor de consultas.

  4. Edite sua consulta SQL no Editor de consultas para adicionar janelas em cascata. Substitua project-id pelo ID do projeto se copiar a consulta a seguir.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. Abaixo do Editor de consultas, clique em Criar job do Cloud Dataflow para criar um novo job com a consulta modificada.

Limpar

Para evitar cobranças dos recursos usados neste tutorial na conta do Google Cloud, siga estas etapas:

  1. Interrompa o script de publicação transactions_injector.py se ele ainda estiver em execução.

  2. Interrompa os jobs do Dataflow em execução. Acesse a IU da Web do Dataflow no Console do Cloud.

    Acessar a IU da Web do Dataflow

    Em cada job criado seguindo este tutorial, siga estas etapas:

    1. Clique no nome do job.

    2. No painel Resumo do job, clique em Interromper job. A caixa de diálogo Interromper job será exibida, oferecendo opções para essa ação.

    3. Clique em Cancelar.

    4. Clique em Interromper job. O serviço suspende toda a ingestão de dados e processamento assim que possível. Como Cancelar interrompe imediatamente o processamento, talvez todos os dados "em trânsito" sejam perdidos. Interromper um job pode levar alguns minutos.

  3. Exclua seu conjunto de dados do BigQuery. Acesse a IU da Web do BigQuery no Console do Cloud.

    Acessar a IU da Web do BigQuery

    1. No painel de navegação, na seção Recursos, clique no conjunto de dados dataflow_sql_dataset criado.

    2. No painel de detalhes, à direita, clique em Excluir conjunto de dados. Esta ação exclui o conjunto de dados, a tabela e todos os dados.

    3. Na caixa de diálogo Excluir conjunto de dados, confirme o comando de exclusão digitando o nome do seu conjunto de dados (dataflow_sql_dataset) e clique em Excluir.

  4. Exclua seu tópico do Pub/Sub. Acesse a página de tópicos do Pub/Sub no Console do Cloud.

    Acesse a página de tópicos do Cloud Pub/Sub

    1. Marque a caixa de seleção ao lado do tópico transactions.

    2. Clique em Excluir para excluir permanentemente o tópico.

    3. Acesse a página de assinaturas do Pub/Sub.

    4. Marque a caixa de seleção ao lado das assinaturas restantes em transactions. Se seus jobs não estiverem mais em execução, talvez não haja mais nenhuma assinatura.

    5. Clique em Excluir para excluir permanentemente as assinaturas.

  5. Exclua o intervalo de preparo do Dataflow no Cloud Storage. Acesse o navegador do Cloud Storage no Console do Cloud.

    Acessar o navegador do Cloud Storage

    1. Marque a caixa de seleção ao lado do intervalo de preparo do Dataflow.

    2. Clique em Excluir para excluir permanentemente o bucket.

A seguir