Neste tutorial, você cria um pipeline de streaming do Dataflow que transforma dados de e-commerce de tópicos e assinaturas do Pub/Sub e gera dados para o BigQuery e o Bigtable. Este tutorial requer o Gradle.
Neste tutorial, fornecemos um aplicativo completo de amostra de e-commerce que faz streaming de dados de uma loja on-line para o BigQuery e o Bigtable. O aplicativo de amostra ilustra casos comuns e práticas recomendadas para implementar a análise de dados de streaming e a inteligência artificial (IA) em tempo real. Use este tutorial para saber como responder dinamicamente às ações do cliente a fim de analisar e reagir a eventos em tempo real. Neste tutorial, descrevemos como armazenar, analisar e visualizar dados de eventos para ter mais insights sobre o comportamento do cliente.
O aplicativo de exemplo está disponível no GitHub. Para executar este tutorial usando o Terraform, siga as etapas fornecidas com o aplicativo de exemplo no GitHub.
Objetivos
- Valide os dados de entrada e aplique correções sempre que possível.
- Analise os dados da sequência de cliques para manter uma contagem do número de visualizações por produto em um determinado período. Armazene essas informações em um armazenamento de baixa latência. O aplicativo pode então usar os dados para fornecer número de pessoas que visualizaram este produto aos clientes no site.
Use os dados da transação para informar a ordem do inventário:
- Analise os dados da transação para calcular o número total de vendas de cada item, por loja ou globalmente, para um determinado período.
- Analise os dados de inventário para calcular o inventário de entrada de cada item.
- Transmita esses dados aos sistemas de inventário de modo contínuo para que eles possam ser usados para a tomada de decisões de compra do inventário.
Valide os dados de entrada e aplique correções sempre que possível. Grave todos os dados que não podem ser corrigidos em uma fila de mensagens inativas para análise e processamento adicionais. Crie uma métrica que represente a porcentagem de dados de entrada que são enviados para a fila de mensagens inativas para monitoramento e alerta.
Processe todos os dados de entrada em um formato padrão e os armazene em um local para usá-los em análises e visualizações futuras.
Desnormalize os dados de transações para vendas na loja para que eles possam incluir informações como a latitude e a longitude do local da loja. Forneça as informações do armazenamento por meio de uma tabela com alterações lentas no BigQuery, que usa o ID do armazenamento como uma chave.
Dados
O aplicativo processa os tipos de dados a seguir:
- Dados de sequência de cliques sendo enviados por sistemas on-line para o Pub/Sub.
- Dados de transações enviados por sistemas locais ou de software como serviço (SaaS) para o Pub/Sub.
- Dados de estoque sendo enviados por sistemas locais ou SaaS para o Pub/Sub.
Padrões de tarefas
O aplicativo contém os seguintes padrões de tarefa comuns aos pipelines criados com o SDK do Apache Beam para Java:
- Esquemas do Apache Beam para trabalhar com dados estruturados
JsonToRow
para converter dados JSON- O gerador de código
AutoValue
para gerar objetos Java antigos e simples (POJOs, na sigla em inglês) - Como enfileirar para análise extra os dados que não podem ser processados
- Transformações de validação de dados seriais
DoFn.StartBundle
para fazer chamadas de microlote para serviços externos- Padrões de entrada secundária
Custos
Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.
Antes de começar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Criar uma conta de serviço do worker gerenciada pelo usuário para o novo pipeline e conceder os papéis necessários à conta de serviço.
Para criar a conta de serviço, execute o comando
gcloud iam service-accounts create
.gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Conceda papéis à conta de serviço. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Substitua
SERVICE_ACCOUNT_ROLE
por cada papel individual.Conceda à sua Conta do Google uma função que permita criar tokens de acesso para a conta de serviço:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Se necessário, faça o download e instale o Gradle.
Criar as origens e os coletores de exemplo
Esta seção explica como criar o seguinte:
- Um bucket do Cloud Storage para usar como local de armazenamento temporário
- Como fazer streaming de fontes de dados usando o Pub/Sub
- Conjuntos de dados para carregar os dados no BigQuery
- Uma instância do Bigtable
crie um bucket do Cloud Storage
Comece criando um bucket do Cloud Storage. Esse bucket é usado como um local de armazenamento temporário pelo pipeline do Dataflow.
Use o comando gcloud storage buckets create
(em inglês).
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Substitua:
- BUCKET_NAME: um nome para o bucket do Cloud Storage que atende aos requisitos de nomenclatura de bucket. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos.
- LOCATION: o local do bucket.
Criar tópicos e assinaturas do Pub/Sub
Crie quatro tópicos do Pub/Sub e três assinaturas.
Para criar seus tópicos, execute o comando
gcloud pubsub topics create
uma vez para cada tópico. Para ver informações sobre como nomear uma assinatura, consulte
Diretrizes para nomear um tópico ou uma assinatura.
gcloud pubsub topics create TOPIC_NAME
Substitua TOPIC_NAME pelos seguintes valores, executando o comando quatro vezes, uma vez para cada tópico:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Para criar uma assinatura para o tópico, execute o comando
gcloud pubsub subscriptions create
uma vez para cada assinatura:
Crie uma assinatura
Clickstream-inbound-sub
:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Crie uma assinatura
Transactions-inbound-sub
:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Crie uma assinatura
Inventory-inbound-sub
:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Criar conjunto de dados e tabela do BigQuery
Crie um conjunto de dados do BigQuery e uma tabela particionada com o esquema apropriado para seu tópico do Pub/Sub.
Use o comando
bq mk
para criar o primeiro conjunto de dados.bq --location=US mk \ PROJECT_ID:Retail_Store
Crie o segundo conjunto de dados.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Use a instrução SQL CREATE TABLE para criar uma tabela com um esquema e dados de teste. Os dados de teste têm uma loja com valor de ID de
1
. O padrão de entrada secundária de atualização lenta usa essa tabela.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Criar uma instância e uma tabela do Bigtable
Crie uma instância e uma tabela do Bigtable. Para mais informações sobre como criar instâncias do Bigtable, consulte Criar uma instância.
Se necessário, execute o seguinte comando para instalar a CLI do
cbt
:gcloud components install cbt
Use o comando
bigtable instances create
para criar uma instância:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Substitua CLUSTER_ZONE pela zona em que o cluster é executado.
Use o comando
cbt createtable
para criar uma tabela:cbt -instance=aggregate-tables createtable PageView5MinAggregates
Use o comando a seguir para adicionar um grupo de colunas à tabela:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Executar o pipeline
Use o Gradle para executar um pipeline de streaming. Para ver o código Java que o pipeline está usando, consulte RetailDataProcessingPipeline.java.
Use o comando
git clone
para clonar o repositório do GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Alterne para o diretório do aplicativo:
cd dataflow-sample-applications/retail/retail-java-applications
Para testar o pipeline, no shell ou terminal, execute o comando a seguir usando o Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Para executar o pipeline, execute o seguinte comando usando o Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
Consulte o código-fonte do pipeline no GitHub.
Criar e executar jobs do Cloud Scheduler
Criar e executar três jobs do Cloud Scheduler, um que publica dados de stream de clique, um para dados de inventário e outro para dados de transação. Essa etapa gera dados de amostra para o pipeline.
Para criar um job do Cloud Scheduler para este tutorial, use o comando
gcloud scheduler jobs create
. Esta etapa cria um editor para dados de sequência de cliques que publicam uma mensagem por minuto.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Para iniciar o job do Cloud Scheduler, use o comando
gcloud scheduler jobs run
.gcloud scheduler jobs run --location=LOCATION clickstream
Criar e executar outro editor semelhante para dados de inventário que publicam uma mensagem a cada dois minutos.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Inicie o segundo job do Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION inventory
Criar e executar um terceiro editor para dados de transações que publicam uma mensagem a cada dois minutos.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Inicie o terceiro job do Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION transactions
Ver os resultados
Veja os dados gravados nas tabelas do BigQuery. Verifique os resultados no BigQuery executando estas consultas: Durante a execução do pipeline, é possível ver novas linhas anexadas às tabelas do BigQuery a cada minuto.
Talvez seja necessário aguardar até que as tabelas sejam preenchidas com dados.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.
Excluir o projeto
A maneira mais fácil de eliminar o faturamento é excluir o projeto do Google Cloud que você criou para o tutorial.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Excluir recursos individuais
Se você quiser reutilizar o projeto, exclua os recursos criados para o tutorial.
Limpar recursos do projeto do Google Cloud
Para excluir os jobs do Cloud Scheduler, use o comando
gcloud scheduler jobs delete
.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Para excluir assinaturas e tópicos do Pub/Sub, use os comandos
gcloud pubsub subscriptions delete
egcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Para excluir a tabela do BigQuery, use o comando
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
Exclua os conjuntos de dados do BigQuery O conjunto de dados em si não gera cobranças.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Para excluir a instância do Bigtable, use o comando
cbt deleteinstance
. O bucket sozinho não gera cobranças.cbt deleteinstance aggregate-tables
Para excluir o bucket do Cloud Storage, use o comando
gcloud storage rm
. O bucket sozinho não gera cobranças.gcloud storage rm gs://BUCKET_NAME --recursive
Revogar credenciais
Revogue os papéis concedidos à conta de serviço do worker gerenciada pelo usuário. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
A seguir
- Veja o aplicativo de exemplo no GitHub.
- Leia a postagem do blog Conheça os padrões do Beam com o processamento de sequência de cliques de dados do Gerenciador de tags do Google
- Leia sobre como usar o Pub/Sub para criar e usar tópicos e usar assinaturas.
- Leia sobre o uso do BigQuery para criar conjuntos de dados.
- Confira arquiteturas de referência, diagramas, tutoriais e práticas recomendadas do Google Cloud. Confira o Centro de arquitetura do Cloud.