Este tutorial mostra como usar o SQL do Dataflow para juntar um fluxo de dados do Pub/Sub com dados de uma tabela do BigQuery.
Objetivos
Neste tutorial:
- Escrever uma consulta SQL do Dataflow que junte dados de streaming do Pub/Sub com dados de tabelas do BigQuery.
- Implemente uma tarefa do Dataflow a partir da IU do Dataflow SQL.
Custos
Neste documento, usa os seguintes componentes faturáveis da Google Cloud Platform:
- Dataflow
- Cloud Storage
- Pub/Sub
- Data Catalog
Para gerar uma estimativa de custos com base na sua utilização projetada,
use a calculadora de preços.
Antes de começar
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. -
Create a service account:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator
). Learn how to grant roles. -
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
Ensure that you have the Create Service Accounts IAM role
(
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. -
Create a service account:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator
). Learn how to grant roles. -
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
Ensure that you have the Create Service Accounts IAM role
(
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - Instale e inicialize a CLI gcloud. Escolha uma das
opções de instalação.
Pode ter de definir a propriedade
project
para o projeto que está a usar neste passo a passo. - Aceda à IU da Web do SQL do Dataflow na Google Cloud consola. Esta ação
abre o projeto ao qual acedeu mais recentemente. Para mudar para um projeto
diferente, clique no nome do projeto na parte superior da
interface Web do Dataflow SQL e pesquise o projeto que quer usar.
Aceda à IU Web do Dataflow SQL
- Um tópico do Pub/Sub denominado
transactions
: uma stream de dados de transações que chega através de uma subscrição ao tópico do Pub/Sub. Os dados de cada transação incluem informações como o produto comprado, o preço de venda e a cidade e o distrito em que a compra ocorreu. Depois de criar o tópico Pub/Sub, cria um script que publica mensagens no tópico. Vai executar este script numa secção posterior deste tutorial. - Uma tabela do BigQuery denominada
us_state_salesregions
: uma tabela que fornece um mapeamento de estados para regiões de vendas. Antes de criar esta tabela, tem de criar um conjunto de dados do BigQuery. Crie um ficheiro de texto e atribua-lhe o nome
transactions_schema.yaml
. Copie e cole o seguinte texto do esquema emtransactions_schema.yaml
.- column: event_timestamp description: Pub/Sub event timestamp mode: REQUIRED type: TIMESTAMP - column: tr_time_str description: Transaction time string mode: NULLABLE type: STRING - column: first_name description: First name mode: NULLABLE type: STRING - column: last_name description: Last name mode: NULLABLE type: STRING - column: city description: City mode: NULLABLE type: STRING - column: state description: State mode: NULLABLE type: STRING - column: product description: Product mode: NULLABLE type: STRING - column: amount description: Amount of transaction mode: NULLABLE type: FLOAT
Atribua o esquema através da CLI do Google Cloud.
a. Atualize a CLI gcloud com o seguinte comando. Certifique-se de que a versão da CLI gcloud é 242.0.0 ou superior.
gcloud components update
b. Execute o seguinte comando numa janela de linha de comandos. Substitua project-id pelo ID do projeto e path-to-file pelo caminho para o ficheiro
transactions_schema.yaml
.gcloud data-catalog entries update \ --lookup-entry='pubsub.topic.`project-id`.transactions' \ --schema-from-file=path-to-file/transactions_schema.yaml
Para mais informações sobre os parâmetros do comando e os formatos de ficheiros de esquema permitidos, consulte a página de documentação de gcloud data-catalog entries update.
c. Confirme se o esquema foi atribuído com êxito ao tópico do
transactions
Pub/Sub. Substitua project-id pelo ID do seu projeto.gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
Navegue para o SQL Workspace.
No painel Editor de SQL do Dataflow, na barra de pesquisa, pesquise
projectid=project-id transactions
. Substitua project-id pelo ID do seu projeto.- No painel Editor de SQL do Dataflow da IU de SQL do Dataflow, clique em
transactions ou pesquise um tópico do Pub/Sub escrevendo
projectid=project-id system=cloud_pubsub
e selecione o tópico. Em Esquema, pode ver o esquema que atribuiu ao tópico do Pub/Sub.
No editor de consultas, clique em Criar tarefa.
No painel Criar tarefa do Dataflow apresentado:
- Em Destino, selecione BigQuery.
- Para ID do conjunto de dados, selecione
dataflow_sql_tutorial
. - Em Nome da tabela, introduza
sales
.
Opcional: o Dataflow escolhe automaticamente as definições que são ideais para a sua tarefa SQL do Dataflow, mas pode expandir o menu Parâmetros opcionais para especificar manualmente as seguintes opções de pipeline:
- Número máximo de trabalhadores
- Zona
- Email da conta de serviço
- Tipo de máquina
- Experiências adicionais
- Configuração do endereço IP do trabalhador
- Rede
- Sub-rede
Clique em Criar. A tarefa do Dataflow demora alguns minutos a começar a ser executada.
Na página Tarefas do Dataflow, clique na tarefa que quer editar.
Na página Detalhes do trabalho, no painel Informações do trabalho, em Opções do pipeline, localize a consulta SQL. Localize a linha para queryString.
Copie e cole a seguinte consulta SQL no editor de SQL do Dataflow no espaço de trabalho de SQL para adicionar janelas deslizantes. Substitua project-id pelo ID do seu projeto.
SELECT sr.sales_region, TUMBLE_START("INTERVAL 15 SECOND") AS period_start, SUM(tr.amount) as amount FROM pubsub.topic.`project-id`.transactions AS tr INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr ON tr.state = sr.state_code GROUP BY sr.sales_region, TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
Clique em Criar tarefa para criar uma nova tarefa com a consulta modificada.
Pare o script de publicação
transactions_injector.py
se ainda estiver em execução.Pare os trabalhos do Dataflow em execução. Aceda à IU da Web do Dataflow na Google Cloud consola.
Para cada tarefa que criou seguindo este tutorial, siga os seguintes passos:
Clique no nome da tarefa.
Na página Detalhes da tarefa, clique em Parar. É apresentada a caixa de diálogo Parar tarefa com as opções de como parar a tarefa.
Selecione Cancelar.
Clique em Parar tarefa. O serviço interrompe todo o carregamento e processamento de dados assim que possível. Uma vez que Cancelar interrompe imediatamente o processamento, pode perder dados "em trânsito". A paragem de uma tarefa pode demorar alguns minutos.
Elimine o conjunto de dados do BigQuery. Aceda à IU da Web do BigQuery na Google Cloud consola.
No painel Explorador, na secção Recursos, clique no conjunto de dados dataflow_sql_tutorial que criou.
No painel de detalhes, clique em Eliminar. É aberta uma caixa de diálogo de confirmação.
Na caixa de diálogo Eliminar conjunto de dados, confirme o comando de eliminação escrevendo
delete
e, de seguida, clique em Eliminar.
Elimine o seu tópico Pub/Sub. Aceda à página de tópicos do Pub/Sub na Google Cloud consola.
Aceda à página de tópicos do Pub/Sub
Selecione o tópico
transactions
.Clique em Eliminar para eliminar o tópico permanentemente. É aberta uma caixa de diálogo de confirmação.
Na caixa de diálogo Eliminar tópico, confirme o comando de eliminação escrevendo
delete
e, de seguida, clique em Eliminar.Aceda à página de subscrições do Pub/Sub.
Selecione as subscrições restantes para o
transactions
. Se as suas tarefas já não estiverem a ser executadas, pode não haver subscrições.Clique em Eliminar para eliminar as subscrições permanentemente. Na caixa de diálogo de confirmação, clique em Eliminar.
Elimine o contentor de preparação do Dataflow no Cloud Storage. Aceda à página Recipientes do Cloud Storage na Google Cloud consola.
Selecione o contentor de preparação do Dataflow.
Clique em Eliminar para eliminar o contentor. É aberta uma caixa de diálogo de confirmação.
Na caixa de diálogo Eliminar contentor, confirme o comando de eliminação escrevendo
DELETE
e, de seguida, clique em Eliminar.
Crie origens de exemplo
Se quiser seguir o exemplo fornecido neste tutorial, crie as seguintes origens e use-as nos passos do tutorial.
Atribua um esquema ao seu tópico Pub/Sub
A atribuição de um esquema permite-lhe executar consultas SQL nos dados do tópico do Pub/Sub. Atualmente, o Dataflow SQL espera que as mensagens nos tópicos do Pub/Sub sejam serializadas no formato JSON.
Para atribuir um esquema ao
tópico Pub/Sub de exemplo transactions
:
Encontre origens do Pub/Sub
A IU do SQL do Dataflow permite encontrar objetos de origem de dados do Pub/Sub para qualquer projeto ao qual tenha acesso, para que não tenha de se lembrar dos respetivos nomes completos.
Para o exemplo neste tutorial, navegue para o editor de SQL do Dataflow e pesquise o tópico do transactions
Pub/Sub
que criou:
Veja o esquema
Crie uma consulta SQL
A IU do Dataflow SQL permite-lhe criar consultas SQL para executar as suas tarefas do Dataflow.
A seguinte consulta SQL é uma consulta de enriquecimento de dados. Adiciona um campo adicional, sales_region
, ao fluxo de eventos do Pub/Sub (transactions
),
usando uma tabela do BigQuery (us_state_salesregions
) que mapeia os estados
para as regiões de vendas.
Copie e cole a seguinte consulta SQL no editor de consultas. Substitua project-id pelo ID do seu projeto.
SELECT tr.*, sr.sales_region FROM pubsub.topic.`project-id`.transactions as tr INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr ON tr.state = sr.state_code
Quando introduz uma consulta na IU de SQL do Dataflow, o validador de consultas valida a sintaxe da consulta. É apresentado um ícone de marca de verificação verde se a consulta for válida. Se a consulta for inválida, é apresentado um ícone de ponto de exclamação vermelho. Se a sintaxe da consulta for inválida, clique no ícone do validador para ver informações sobre o que tem de corrigir.
A captura de ecrã seguinte mostra a consulta válida no editor de consultas. O validador apresenta uma marca de verificação verde.

Crie uma tarefa do Dataflow para executar a sua consulta SQL
Para executar a consulta SQL, crie uma tarefa do Dataflow a partir da IU do Dataflow SQL.
Veja a tarefa do Dataflow
O Dataflow transforma a sua consulta SQL num pipeline do Apache Beam. Clique em Ver tarefa para abrir a IU Web do Dataflow, onde pode ver uma representação gráfica do seu pipeline.

Para ver uma discriminação das transformações que ocorrem no pipeline, clique nas caixas. Por exemplo, se clicar na primeira caixa na representação gráfica, etiquetada como Executar consulta SQL, é apresentado um gráfico que mostra as operações que ocorrem nos bastidores.
As duas primeiras caixas representam as duas entradas que associou: o tópico do Pub/Sub, transactions
, e a tabela do BigQuery, us_state_salesregions
.

Para ver a tabela de resultados que contém os resultados da tarefa, aceda à
IU do BigQuery.
No painel Explorador, no seu projeto, clique no conjunto de dados dataflow_sql_tutorial
que criou. Em seguida, clique na tabela de saída,
sales
. O separador Pré-visualização apresenta o conteúdo da tabela de saída.

Veja trabalhos anteriores e edite as suas consultas
A IU do Dataflow armazena tarefas e consultas anteriores na página Tarefas do Dataflow.
Pode usar a lista do histórico de tarefas para ver consultas SQL anteriores. Por exemplo, quer modificar a consulta para agregar as vendas por região de vendas a cada 15 segundos. Use a página Tarefas para aceder à tarefa em execução que iniciou anteriormente no tutorial, copie a consulta SQL e execute outra tarefa com uma consulta modificada.
Limpar
Para evitar incorrer em cobranças na sua conta do Cloud Billing pelos recursos usados neste tutorial:
O que se segue?
- Veja uma introdução ao Dataflow SQL.
- Saiba mais sobre as noções básicas da pipeline de streaming.
- Explore a referência de SQL do Dataflow.
- Veja a demonstração de estatísticas de streaming apresentada no Cloud Next 2019.