Como unir dados de streaming com o Dataflow SQL


Neste tutorial, você verá como usar o Dataflow SQL para unir um fluxo de dados do Pub/Sub com dados de uma tabela do BigQuery.

Objetivos

Neste tutorial, você aprenderá a:

  • gravar uma consulta SQL do Dataflow que mescle dados de streaming do Pub/Sub com dados da tabela do BigQuery;
  • implantar um job do Dataflow com base na IU do Dataflow SQL.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  11. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  12. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  13. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  14. Instale e inicialize a CLI gcloud. Escolha uma das opções de instalação. Talvez seja necessário definir a propriedade project como o projeto que você está usando neste tutorial.
  15. Acesse a IU da Web do SQL do Dataflow no console do Google Cloud. Isso abre seu projeto acessado mais recentemente. Se quiser alternar para um projeto diferente, clique no nome do projeto na parte superior da IU da Web do BigQuery e pesquise aquele que você quer usar.
    Acessar a IU da Web do Dataflow SQL

Criar exemplos de fonte

Se você quiser seguir o exemplo fornecido neste tutorial, crie as fontes a seguir e use-as nas etapas abaixo.

  • Um tópico do Pub/Sub chamado transactions: um stream de dados de transação que chega por meio de uma assinatura no tópico do Pub/Sub. Os dados para cada transação incluem informações como produto comprado, preço promocional e a cidade em que ocorreu a compra. Depois de criar o tópico do Pub/Sub, crie um script que publique mensagens no seu tópico. Esse script será executado em uma seção futura deste tutorial.
  • Uma tabela do BigQuery chamada us_state_salesregions: uma tabela que fornece um mapeamento de estados para regiões de vendas. Antes de criar esta tabela, é preciso criar um conjunto de dados do BigQuery.

Atribuir um esquema ao seu tópico do Pub/Sub

A atribuição de um esquema permite executar consultas SQL dos dados do seu tópico do Pub/Sub. Atualmente, o Dataflow SQL espera que as mensagens nos tópicos do Pub/Sub sejam serializadas no formato JSON.

Para atribuir um esquema ao tópico de exemplo do Pub/Sub transactions:

  1. Crie um arquivo de texto e nomeie-o como transactions_schema.yaml. Copie e cole o texto de esquema a seguir em transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Atribua o esquema usando a Google Cloud CLI.

    a. Atualize a CLI gcloud com o comando a seguir. Garanta que a versão da CLI gcloud é 242.0.0 ou posterior.

      gcloud components update
    

    b. Execute o comando a seguir em uma janela de linha de comando. Substitua project-id pelo ID do projeto e path-to-file pelo caminho do arquivo transactions_schema.yaml.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Para mais informações sobre parâmetros de comando e formatos de arquivo de esquema permitidos, consulte a página de documentação de atualização de entradas de catálogo de dados do gcloud.

    c. Confirme se o esquema foi atribuído com êxito ao tópico transactions do Pub/Sub. Substitua project-id pelo ID do projeto.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Encontrar fontes do Pub/Sub

A IU do SQL do Dataflow oferece uma forma de encontrar fontes de dados do Pub/Sub de qualquer projeto a que você tenha acesso, para que não precise lembrar os nomes completos deles.

No exemplo deste tutorial, navegue até o editor do Dataflow SQL e procure o tópico transactions do Pub/Sub que você criou:

  1. Navegue até o SQL Workspace.

  2. No painel Editor do SQL do Dataflow, na barra de pesquisa, procure por projectid=project-id transactions. Substitua project-id pelo ID do projeto.

    Painel de pesquisa do Data Catalog no espaço de trabalho do SQL do Dataflow.

Ver o esquema

  1. No painel Editor do SQL do Dataflow, clique em transações ou pesquise um tópico do Pub/Sub digitando projectid=project-id system=cloud_pubsub. selecione o tópico.
  2. Em Esquema, é possível visualizar o esquema atribuído ao tópico do Pub/Sub.

    Esquema atribuído ao tópico, incluindo uma lista de nomes de campos e suas descrições.

Criar uma consulta SQL

A IU do Dataflow SQL permite criar consultas SQL para executar jobs do Dataflow.

A consulta SQL a seguir é de enriquecimento de dados. Ele acrescenta mais um campo, sales_region, ao stream de eventos do Pub/Sub (transactions), usando uma tabela do BigQuery (us_state_salesregions) que mapeia estados nas regiões de vendas.

Copie e cole a consulta SQL abaixo no Editor de consultas. Substitua project-id pelo ID do projeto:

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Ao inserir uma consulta na IU do Dataflow SQL, o validador da consulta verifica a sintaxe dela. Se a consulta for válida, será exibido um ícone de marca de seleção verde. Se a consulta for inválida, será exibido um ícone de ponto de exclamação vermelho. Se a sintaxe for inválida, clicar no ícone do validador fornecerá informações sobre o que é necessário corrigir.

A captura de tela a seguir mostra a consulta válida no Editor de consulta. O validador exibe uma marca de seleção verde.

Espaço de trabalho do SQL do Dataflow com a consulta do tutorial visível no editor.

Criar um job do Dataflow para executar sua consulta SQL

Para executar sua consulta SQL, crie um job do Dataflow na IU do Dataflow SQL.

  1. Acima do Editor de consultas, clique em Criar job.

  2. No painel Criar job do Dataflow que é aberto:

    • Em Destino, selecione BigQuery.
    • Em ID do conjunto de dados, selecione dataflow_sql_tutorial.
    • Para Nome da tabela, digite sales.
    Formulário "Criar o job do SQL do Dataflow".
  3. (Opcional) O Dataflow escolhe automaticamente as configurações ideais para seu job do Dataflow SQL, mas é possível expandir o menu dos Parâmetros opcionais para especificar manualmente as seguintes opções de pipeline:

    • Número máximo de workers
    • Zona
    • E-mail da conta de serviço
    • Machine type
    • Outros experimentos
    • Configuração do endereço IP do worker
    • Rede
    • Sub-rede
  4. Clique em Criar. O job do Dataflow leva alguns minutos para começar a ser executado.

Ver o job do Dataflow

O Dataflow transforma sua consulta SQL em um pipeline do Apache Beam. Clique em Ver job para abrir a IU da Web do Dataflow, em que é possível ver uma representação gráfica do pipeline.

Pipeline da consulta SQL exibida na IU da Web do Dataflow.

Para ver um detalhamento das transformações que ocorrem no pipeline, clique nas caixas. Por exemplo, se você clicar na caixa superior na representação gráfica, com o rótulo Executar consulta SQL, aparecerá um gráfico que mostra as operações em segundo plano.

As duas primeiras caixas representam as duas entradas que você mesclou: o tópico do Pub/Sub, transactions e a tabela do BigQuery, us_state_salesregions.

A saída de gravação de uma junção de duas entradas será concluída em 25 segundos.

Para ver a tabela de saída que contém os resultados do job, acesse a IU do BigQuery. No painel Explorer, no projeto, clique no conjunto de dados dataflow_sql_tutorial que você criou. Em seguida, clique na tabela de saída sales. A guia Visualização exibe o conteúdo da tabela de saída.

A tabela de visualização contém colunas para tr_time_str, first_name, last_name, city, state, product, amount e sales_region.

Ver jobs passados e editar suas consultas

A IU do Dataflow armazena jobs e consultas anteriores na página Jobs.

É possível usar a lista do histórico de jobs para ver consultas SQL anteriores. Por exemplo, você quer modificar sua consulta para agregar vendas por região de vendas a cada 15 segundos. Use a página Jobs para acessar o job em execução que você iniciou anteriormente no tutorial, copie a consulta SQL e execute outro job com uma consulta modificada.

  1. Na página Jobs do Dataflow, clique no job que você quer editar.

  2. Na página Detalhes do job, no painel Informações do job, localize Opções de pipeline na consulta SQL. Encontre a linha de queryString.

    A opção do pipeline de job chamada queryString.
  3. Copie e cole a seguinte consulta SQL no Editor de SQL do Dataflow no SQL Workspace para adicionar janelas em cascata Substitua project-id pelo ID do projeto.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Clique em Criar job para criar um novo job com a consulta modificada.

Limpar

Para evitar cobranças gerados pelos recursos usados neste tutorial na conta de faturamento do Cloud:

  1. Interrompa o script de publicação transactions_injector.py se ele ainda estiver em execução.

  2. Interrompa os jobs do Dataflow em execução. Acesse a IU da Web do Dataflow no console do Google Cloud.

    Acessar a IU da Web do Dataflow

    Em cada job criado seguindo este tutorial, siga estas etapas:

    1. Clique no nome do job.

    2. Na página de detalhes do job, clique em Parar. A caixa de diálogo Interromper job será exibida, oferecendo opções para essa ação.

    3. Selecione Cancelar.

    4. Clique em Interromper job. O serviço suspende toda a ingestão de dados e processamento assim que possível. Como Cancelar interrompe imediatamente o processamento, talvez todos os dados "em trânsito" sejam perdidos. Interromper um job pode levar alguns minutos.

  3. Exclua seu conjunto de dados do BigQuery. Acesse a IU da Web do BigQuery no console do Google Cloud.

    Acessar a IU da Web do BigQuery

    1. No painel Explorer, na seção Explorer, clique no conjunto de dados Explorer criado.

    2. No painel de detalhes, clique em Excluir. Uma caixa de diálogo de confirmação será exibida.

    3. Na caixa de diálogo Excluir conjunto de dados, confirme o comando de exclusão digitando delete e clique em Excluir.

  4. Exclua seu tópico do Pub/Sub. Acesse a página tópicos do Pub/Sub no console do Google Cloud.

    Acesse a página de tópicos do Cloud Pub/Sub

    1. Selecione o tópico transactions.

    2. Clique em Excluir para excluir permanentemente o tópico. Uma caixa de diálogo de confirmação será exibida.

    3. Na caixa de diálogo Excluir tópico, confirme o comando de exclusão digitando delete e clique em Excluir.

    4. Acesse a página de assinaturas do Pub/Sub.

    5. Selecione as assinaturas restantes até transactions. Se seus jobs não estiverem mais em execução, pode não existir nenhuma assinatura.

    6. Clique em Excluir para excluir permanentemente as assinaturas. Na caixa de diálogo de confirmação, clique em Excluir.

  5. Exclua o intervalo de preparo do Dataflow no Cloud Storage. Acesse a página Buckets do Cloud Storage no console do Google Cloud.

    Acessar buckets

    1. Selecione o bucket de preparo do Dataflow.

    2. Clique em Excluir para excluir permanentemente o bucket. Uma caixa de diálogo de confirmação será exibida.

    3. Na caixa de diálogo Excluir bucket, confirme o comando de exclusão digitando DELETE e clique em Excluir.

A seguir