Criar um pipeline de streaming de e-commerce


Neste tutorial, você cria um pipeline de streaming do Dataflow que transforma dados de e-commerce de tópicos e assinaturas do Pub/Sub e gera dados para o BigQuery e o Bigtable. Este tutorial requer o Gradle.

Neste tutorial, fornecemos um aplicativo completo de amostra de e-commerce que faz streaming de dados de uma loja on-line para o BigQuery e o Bigtable. O aplicativo de amostra ilustra casos comuns e práticas recomendadas para implementar a análise de dados de streaming e a inteligência artificial (IA) em tempo real. Use este tutorial para saber como responder dinamicamente às ações do cliente a fim de analisar e reagir a eventos em tempo real. Neste tutorial, descrevemos como armazenar, analisar e visualizar dados de eventos para ter mais insights sobre o comportamento do cliente.

O aplicativo de exemplo está disponível no GitHub. Para executar este tutorial usando o Terraform, siga as etapas fornecidas com o aplicativo de amostra no GitHub.

Objetivos

  • Valide os dados de entrada e aplique correções sempre que possível.
  • Analise os dados da sequência de cliques para manter uma contagem do número de visualizações por produto em um determinado período. Armazene essas informações em um armazenamento de baixa latência. O aplicativo pode então usar os dados para fornecer número de pessoas que visualizaram este produto aos clientes no site.
  • Use os dados da transação para informar a ordem do inventário:

    • Analise os dados da transação para calcular o número total de vendas de cada item, por loja ou globalmente, para um determinado período.
    • Analise os dados de inventário para calcular o inventário de entrada de cada item.
    • Transmita esses dados aos sistemas de inventário de modo contínuo para que eles possam ser usados para a tomada de decisões de compra do inventário.
  • Valide os dados de entrada e aplique correções sempre que possível. Grave todos os dados que não podem ser corrigidos em uma fila de mensagens inativas para análise e processamento adicionais. Crie uma métrica que represente a porcentagem de dados de entrada que são enviados para a fila de mensagens inativas para monitoramento e alerta.

  • Processe todos os dados de entrada em um formato padrão e os armazene em um local para usá-los em análises e visualizações futuras.

  • Desnormalize os dados de transações para vendas na loja para que eles possam incluir informações como a latitude e a longitude do local da loja. Forneça as informações do armazenamento por meio de uma tabela com alterações lentas no BigQuery, que usa o ID do armazenamento como uma chave.

Dados

O aplicativo processa os tipos de dados a seguir:

  • Dados de sequência de cliques sendo enviados por sistemas on-line para o Pub/Sub.
  • Dados de transações enviados por sistemas locais ou de software como serviço (SaaS) para o Pub/Sub.
  • Dados de estoque sendo enviados por sistemas locais ou SaaS para o Pub/Sub.

Padrões de tarefas

O aplicativo contém os seguintes padrões de tarefa comuns aos pipelines criados com o SDK do Apache Beam para Java:

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • O BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Criar uma conta de serviço do worker gerenciada pelo usuário para o novo pipeline e conceder os papéis necessários à conta de serviço.

    1. Para criar a conta de serviço, execute o comando gcloud iam service-accounts create.

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Conceda papéis à conta de serviço. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Substitua SERVICE_ACCOUNT_ROLE por cada papel individual.

    3. Conceda à sua Conta do Google uma função que permita criar tokens de acesso para a conta de serviço:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. Se necessário, faça o download e instale o Gradle.

Criar as origens e os coletores de exemplo

Esta seção explica como criar o seguinte:

  • Um bucket do Cloud Storage para usar como local de armazenamento temporário
  • Como fazer streaming de fontes de dados usando o Pub/Sub
  • Conjuntos de dados para carregar os dados no BigQuery
  • Uma instância do Bigtable

crie um bucket do Cloud Storage

Comece criando um bucket do Cloud Storage. Esse bucket é usado como um local de armazenamento temporário pelo pipeline do Dataflow.

Use o comando gcloud storage buckets create (em inglês).

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Substitua:

  • BUCKET_NAME: um nome para o bucket do Cloud Storage que atende aos requisitos de nomenclatura de bucket. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos.
  • LOCATION: o local do bucket.

Criar tópicos e assinaturas do Pub/Sub

Crie quatro tópicos do Pub/Sub e três assinaturas.

Para criar seus tópicos, execute o comando gcloud pubsub topics create uma vez para cada tópico. Para ver informações sobre como nomear uma assinatura, consulte Diretrizes para nomear um tópico ou uma assinatura.

gcloud pubsub topics create TOPIC_NAME

Substitua TOPIC_NAME pelos seguintes valores, executando o comando quatro vezes, uma vez para cada tópico:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

Para criar uma assinatura para o tópico, execute o comando gcloud pubsub subscriptions create uma vez para cada assinatura:

  1. Crie uma assinatura Clickstream-inbound-sub:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Crie uma assinatura Transactions-inbound-sub:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Crie uma assinatura Inventory-inbound-sub:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Criar conjunto de dados e tabela do BigQuery

Crie um conjunto de dados do BigQuery e uma tabela particionada com o esquema apropriado para seu tópico do Pub/Sub.

  1. Use o comando bq mk para criar o primeiro conjunto de dados.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Crie o segundo conjunto de dados.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Use a instrução SQL CREATE TABLE para criar uma tabela com um esquema e dados de teste. Os dados de teste têm uma loja com valor de ID de 1. O padrão de entrada secundária de atualização lenta usa essa tabela.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Criar uma instância e uma tabela do Bigtable

Crie uma instância e uma tabela do Bigtable. Para mais informações sobre como criar instâncias do Bigtable, consulte Criar uma instância.

  1. Se necessário, execute o seguinte comando para instalar a CLI do cbt:

    gcloud components install cbt
    
  2. Use o comando bigtable instances create para criar uma instância:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Substitua CLUSTER_ZONE pela zona em que o cluster é executado.

  3. Use o comando cbt createtable para criar uma tabela:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Use o comando a seguir para adicionar um grupo de colunas à tabela:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Executar o pipeline

Use o Gradle para executar um pipeline de streaming. Para ver o código Java que o pipeline está usando, consulte RetailDataProcessingPipeline.java.

  1. Use o comando git clone para clonar o repositório do GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Alterne para o diretório do aplicativo:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. Para testar o pipeline, no shell ou terminal, execute o comando a seguir usando o Gradle:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. Para executar o pipeline, execute o seguinte comando usando o Gradle:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

Consulte o código-fonte do pipeline no GitHub.

Criar e executar jobs do Cloud Scheduler

Criar e executar três jobs do Cloud Scheduler, um que publica dados de stream de clique, um para dados de inventário e outro para dados de transação. Essa etapa gera dados de amostra para o pipeline.

  1. Para criar um job do Cloud Scheduler para este tutorial, use o comando gcloud scheduler jobs create. Esta etapa cria um editor para dados de sequência de cliques que publicam uma mensagem por minuto.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Para iniciar o job do Cloud Scheduler, use o comando gcloud scheduler jobs run.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Criar e executar outro editor semelhante para dados de inventário que publicam uma mensagem a cada dois minutos.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Inicie o segundo job do Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Criar e executar um terceiro editor para dados de transações que publicam uma mensagem a cada dois minutos.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Inicie o terceiro job do Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION transactions
    

Ver os resultados

Veja os dados gravados nas tabelas do BigQuery. Verifique os resultados no BigQuery executando estas consultas: Durante a execução do pipeline, é possível ver novas linhas anexadas às tabelas do BigQuery a cada minuto.

Talvez seja necessário aguardar até que as tabelas sejam preenchidas com dados.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Limpeza

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Excluir o projeto

A maneira mais fácil de eliminar o faturamento é excluir o projeto do Google Cloud que você criou para o tutorial.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Excluir recursos individuais

Se você quiser reutilizar o projeto, exclua os recursos criados para o tutorial.

Limpar recursos do projeto do Google Cloud

  1. Para excluir os jobs do Cloud Scheduler, use o comando gcloud scheduler jobs delete.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Para excluir assinaturas e tópicos do Pub/Sub, use os comandos gcloud pubsub subscriptions delete e gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. Para excluir a tabela do BigQuery, use o comando bq rm.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. Exclua os conjuntos de dados do BigQuery O conjunto de dados em si não gera cobranças.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Para excluir a instância do Bigtable, use o comando cbt deleteinstance. O bucket sozinho não gera cobranças.

    cbt deleteinstance aggregate-tables
    
  6. Para excluir o bucket do Cloud Storage, use o comando gcloud storage rm. O bucket sozinho não gera cobranças.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revogar credenciais

  1. Revogue os papéis concedidos à conta de serviço do worker gerenciada pelo usuário. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

A seguir