Como unir dados de streaming com o Dataflow SQL


Neste tutorial, você verá como usar o Dataflow SQL para unir um fluxo de dados do Pub/Sub com dados de uma tabela do BigQuery.

Objetivos

Neste tutorial, você aprenderá a:

  • gravar uma consulta SQL do Dataflow que mescle dados de streaming do Pub/Sub com dados da tabela do BigQuery;
  • implantar um job do Dataflow com base na IU do Dataflow SQL.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  4. Ative as APIs Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON do Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager e Data Catalog. .

    Ative as APIs

  5. Crie uma conta de serviço:

    1. No Console do Google Cloud, acesse a página Criar conta de serviço.

      Acesse "Criar conta de serviço"
    2. Selecione o projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Google Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar e continuar.
    5. Conceda o papel Project > Owner à conta de serviço do.

      Para conceder o papel, encontre a lista Selecionar um papel e clique em Project > Owner.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-la na próxima etapa.

  6. Crie uma chave de conta de serviço:

    1. No console do Google Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  7. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém suas credenciais. Essa variável só se aplica à sessão de shell atual. Assim, se você abrir uma nova sessão, precisará definir a variável novamente.

  8. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  9. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  10. Ative as APIs Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON do Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager e Data Catalog. .

    Ative as APIs

  11. Crie uma conta de serviço:

    1. No Console do Google Cloud, acesse a página Criar conta de serviço.

      Acesse "Criar conta de serviço"
    2. Selecione o projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Google Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar e continuar.
    5. Conceda o papel Project > Owner à conta de serviço do.

      Para conceder o papel, encontre a lista Selecionar um papel e clique em Project > Owner.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-la na próxima etapa.

  12. Crie uma chave de conta de serviço:

    1. No console do Google Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  13. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém suas credenciais. Essa variável só se aplica à sessão de shell atual. Assim, se você abrir uma nova sessão, precisará definir a variável novamente.

  14. Instale e inicialize a CLI gcloud. Escolha uma das opções de instalação. Talvez seja necessário definir a propriedade project como o projeto que você está usando neste tutorial.
  15. Acesse a IU da Web do SQL do Dataflow no console do Google Cloud. Isso abre seu projeto acessado mais recentemente. Se quiser alternar para um projeto diferente, clique no nome do projeto na parte superior da IU da Web do BigQuery e pesquise aquele que você quer usar.
    Acessar a IU da Web do Dataflow SQL

Criar exemplos de fonte

Se você quiser seguir o exemplo fornecido neste tutorial, crie as fontes a seguir e use-as nas etapas abaixo.

  • Um tópico do Pub/Sub chamado transactions: um stream de dados de transação que chega por meio de uma assinatura no tópico do Pub/Sub. Os dados para cada transação incluem informações como produto comprado, preço promocional e a cidade em que ocorreu a compra. Depois de criar o tópico do Pub/Sub, crie um script que publique mensagens no seu tópico. Esse script será executado em uma seção futura deste tutorial.
  • Uma tabela do BigQuery chamada us_state_salesregions: uma tabela que fornece um mapeamento de estados para regiões de vendas. Antes de criar esta tabela, é preciso criar um conjunto de dados do BigQuery.

Atribuir um esquema ao seu tópico do Pub/Sub

A atribuição de um esquema permite executar consultas SQL dos dados do seu tópico do Pub/Sub. Atualmente, o Dataflow SQL espera que as mensagens nos tópicos do Pub/Sub sejam serializadas no formato JSON.

Para atribuir um esquema ao tópico de exemplo do Pub/Sub transactions:

  1. Crie um arquivo de texto e nomeie-o como transactions_schema.yaml. Copie e cole o texto de esquema a seguir em transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Atribua o esquema usando a Google Cloud CLI.

    a. Atualize a CLI gcloud com o comando a seguir. Garanta que a versão da CLI gcloud é 242.0.0 ou posterior.

      gcloud components update
    

    b. Execute o comando a seguir em uma janela de linha de comando. Substitua project-id pelo ID do projeto e path-to-file pelo caminho do arquivo transactions_schema.yaml.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Para mais informações sobre parâmetros de comando e formatos de arquivo de esquema permitidos, consulte a página de documentação de atualização de entradas de catálogo de dados do gcloud.

    c. Confirme se o esquema foi atribuído com êxito ao tópico transactions do Pub/Sub. Substitua project-id pelo ID do projeto.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Encontrar fontes do Pub/Sub

A IU do SQL do Dataflow oferece uma forma de encontrar fontes de dados do Pub/Sub de qualquer projeto a que você tenha acesso, para que não precise lembrar os nomes completos deles.

No exemplo deste tutorial, navegue até o editor do Dataflow SQL e procure o tópico transactions do Pub/Sub que você criou:

  1. Navegue até o SQL Workspace.

  2. No painel Editor do SQL do Dataflow, na barra de pesquisa, procure por projectid=project-id transactions. Substitua project-id pelo ID do projeto.

    Painel de pesquisa do Data Catalog no espaço de trabalho do SQL do Dataflow.

Ver o esquema

  1. No painel Editor do SQL do Dataflow, clique em transações ou pesquise um tópico do Pub/Sub digitando projectid=project-id system=cloud_pubsub. selecione o tópico.
  2. Em Esquema, é possível visualizar o esquema atribuído ao tópico do Pub/Sub.

    Esquema atribuído ao tópico, incluindo uma lista de nomes de campos e suas descrições.

Criar uma consulta SQL

A IU do Dataflow SQL permite criar consultas SQL para executar jobs do Dataflow.

A consulta SQL a seguir é de enriquecimento de dados. Ele acrescenta mais um campo, sales_region, ao stream de eventos do Pub/Sub (transactions), usando uma tabela do BigQuery (us_state_salesregions) que mapeia estados nas regiões de vendas.

Copie e cole a consulta SQL abaixo no Editor de consultas. Substitua project-id pelo ID do projeto:

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Ao inserir uma consulta na IU do Dataflow SQL, o validador da consulta verifica a sintaxe dela. Se a consulta for válida, será exibido um ícone de marca de seleção verde. Se a consulta for inválida, será exibido um ícone de ponto de exclamação vermelho. Se a sintaxe for inválida, clicar no ícone do validador fornecerá informações sobre o que é necessário corrigir.

A captura de tela a seguir mostra a consulta válida no Editor de consulta. O validador exibe uma marca de seleção verde.

Espaço de trabalho do SQL do Dataflow com a consulta do tutorial visível no editor.

Criar um job do Dataflow para executar sua consulta SQL

Para executar sua consulta SQL, crie um job do Dataflow na IU do Dataflow SQL.

  1. Acima do Editor de consultas, clique em Criar job.

  2. No painel Criar job do Dataflow que é aberto:

    • Em Destino, selecione BigQuery.
    • Em ID do conjunto de dados, selecione dataflow_sql_tutorial.
    • Para Nome da tabela, digite sales.
    Formulário "Criar o job do SQL do Dataflow".
  3. (Opcional) O Dataflow escolhe automaticamente as configurações ideais para seu job do Dataflow SQL, mas é possível expandir o menu dos Parâmetros opcionais para especificar manualmente as seguintes opções de pipeline:

    • Número máximo de workers
    • Zona
    • E-mail da conta de serviço
    • Machine type
    • Outros experimentos
    • Configuração do endereço IP do worker
    • Rede
    • Sub-rede
  4. Clique em Criar. O job do Dataflow leva alguns minutos para começar a ser executado.

Ver o job do Dataflow

O Dataflow transforma sua consulta SQL em um pipeline do Apache Beam. Clique em Ver job para abrir a IU da Web do Dataflow, em que é possível ver uma representação gráfica do pipeline.

Pipeline da consulta SQL exibida na IU da Web do Dataflow.

Para ver um detalhamento das transformações que ocorrem no pipeline, clique nas caixas. Por exemplo, se você clicar na caixa superior na representação gráfica, com o rótulo Executar consulta SQL, aparecerá um gráfico que mostra as operações em segundo plano.

As duas primeiras caixas representam as duas entradas que você mesclou: o tópico do Pub/Sub, transactions e a tabela do BigQuery, us_state_salesregions.

A saída de gravação de uma junção de duas entradas será concluída em 25 segundos.

Para ver a tabela de saída que contém os resultados do job, acesse a IU do BigQuery. No painel Explorer, no projeto, clique no conjunto de dados dataflow_sql_tutorial que você criou. Em seguida, clique na tabela de saída sales. A guia Visualização exibe o conteúdo da tabela de saída.

A tabela de visualização contém colunas para tr_time_str, first_name, last_name, city, state, product, amount e sales_region.

Ver jobs passados e editar suas consultas

A IU do Dataflow armazena jobs e consultas anteriores na página Jobs.

É possível usar a lista do histórico de jobs para ver consultas SQL anteriores. Por exemplo, você quer modificar sua consulta para agregar vendas por região de vendas a cada 15 segundos. Use a página Jobs para acessar o job em execução que você iniciou anteriormente no tutorial, copie a consulta SQL e execute outro job com uma consulta modificada.

  1. Na página Jobs do Dataflow, clique no job que você quer editar.

  2. Na página Detalhes do job, no painel Informações do job, localize Opções de pipeline na consulta SQL. Encontre a linha de queryString.

    A opção do pipeline de job chamada queryString.
  3. Copie e cole a seguinte consulta SQL no Editor de SQL do Dataflow no SQL Workspace para adicionar janelas em cascata Substitua project-id pelo ID do projeto.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Clique em Criar job para criar um novo job com a consulta modificada.

Limpeza

Para evitar cobranças gerados pelos recursos usados neste tutorial na conta de faturamento do Cloud:

  1. Interrompa o script de publicação transactions_injector.py se ele ainda estiver em execução.

  2. Interrompa os jobs do Dataflow em execução. Acesse a IU da Web do Dataflow no console do Google Cloud.

    Acessar a IU da Web do Dataflow

    Em cada job criado seguindo este tutorial, siga estas etapas:

    1. Clique no nome do job.

    2. Na página de detalhes do job, clique em Parar. A caixa de diálogo Interromper job será exibida, oferecendo opções para essa ação.

    3. Selecione Cancelar.

    4. Clique em Interromper job. O serviço suspende toda a ingestão de dados e processamento assim que possível. Como Cancelar interrompe imediatamente o processamento, talvez todos os dados "em trânsito" sejam perdidos. Interromper um job pode levar alguns minutos.

  3. Exclua seu conjunto de dados do BigQuery. Acesse a IU da Web do BigQuery no console do Google Cloud.

    Acessar a IU da Web do BigQuery

    1. No painel Explorador, na seção Recursos, clique no conjunto de dados dataflow_sql_tutorial criado.

    2. No painel de detalhes, clique em Excluir. Uma caixa de diálogo de confirmação será exibida.

    3. Na caixa de diálogo Excluir conjunto de dados, confirme o comando de exclusão digitando delete e clique em Excluir.

  4. Exclua seu tópico do Pub/Sub. Acesse a página tópicos do Pub/Sub no console do Google Cloud.

    Acesse a página de tópicos do Cloud Pub/Sub

    1. Selecione o tópico transactions.

    2. Clique em Excluir para excluir permanentemente o tópico. Uma caixa de diálogo de confirmação será exibida.

    3. Na caixa de diálogo Excluir tópico, confirme o comando de exclusão digitando delete e clique em Excluir.

    4. Acesse a página de assinaturas do Pub/Sub.

    5. Selecione as assinaturas restantes até transactions. Se seus jobs não estiverem mais em execução, pode não existir nenhuma assinatura.

    6. Clique em Excluir para excluir permanentemente as assinaturas. Na caixa de diálogo de confirmação, clique em Excluir.

  5. Exclua o intervalo de preparo do Dataflow no Cloud Storage. Acesse a página Buckets do Cloud Storage no console do Google Cloud.

    Acessar buckets

    1. Selecione o bucket de preparo do Dataflow.

    2. Clique em Excluir para excluir permanentemente o bucket. Uma caixa de diálogo de confirmação será exibida.

    3. Na caixa de diálogo Excluir bucket, confirme o comando de exclusão digitando DELETE e clique em Excluir.

A seguir