Crea una canalización de transmisión de comercio electrónico


En este instructivo, crearás una canalización de transmisión de Dataflow que transforma los datos de comercio electrónico de los temas y las suscripciones de Pub/Sub y envía los datos a BigQuery y Bigtable. Este instructivo requiere Gradle.

En el instructivo, se proporciona una aplicación de ejemplo de comercio electrónico de extremo a extremo que transmite datos de una tienda web a BigQuery y Bigtable. En la aplicación de muestra, se ilustran casos de uso comunes y prácticas recomendadas para implementar las estadísticas de datos de transmisión y la inteligencia artificial (IA) en tiempo real. Usa este instructivo para aprender a responder de forma dinámica a las acciones de los clientes a fin de analizar eventos y reaccionar ante ellos en tiempo real. En este instructivo, se describe cómo almacenar, analizar y visualizar datos de eventos para obtener más estadísticas sobre el comportamiento de los clientes.

La aplicación de muestra está disponible en GitHub. Para ejecutar este instructivo mediante Terraform, sigue los pasos proporcionados con la aplicación de muestra en GitHub.

Objetivos

  • Validar los datos de entrada y aplicar correcciones cuando sea posible
  • Analizar los datos de flujo de clics para mantener un recuento de la cantidad de vistas por producto en un período determinado. Almacena esta información en un almacén de latencia baja. La aplicación puede usar los datos para proporcionar mensajes sobre la cantidad de personas que vieron este producto a los clientes en el sitio web.
  • Usar los datos de transacciones para informar los pedidos de inventario:

    • Analizar los datos de las transacciones para calcular la cantidad total de ventas de cada artículo, por tienda y a nivel global, durante un período determinado
    • Analizar los datos de inventario a fin de calcular el inventario entrante para cada elemento
    • Pasar estos datos a los sistemas de inventario de forma continua para poder usarlos en la toma de decisiones de compra de inventario.
  • Validar los datos de entrada y aplicar correcciones cuando sea posible Escribir los datos que no se puedan corregir en una cola de mensajes no entregados para su análisis y procesamiento adicionales. Haz una métrica que represente el porcentaje de datos entrante que se envía a la cola de mensajes no entregados disponibles para supervisar y crear alertas

  • Procesar todos los datos de entrada en un formato estándar y almacenarlos en un almacén de datos para usarlos en próximos análisis y visualizaciones

  • Desnormalizar los datos de transacciones para las ventas en la tienda a fin de que puedan incluir información como la latitud y la longitud de la ubicación de la tienda. Para proporcionar la información de la tienda mediante una tabla que cambia con lentitud en BigQuery, usa el ID de tienda como clave.

Datos

La aplicación procesa los siguientes tipos de datos:

  • Datos de flujo de clics que se envían mediante sistemas en línea a Pub/Sub.
  • Datos de transacciones que se enviarán mediante sistemas locales o de software como servicio (SaaS) a Pub/Sub.
  • Datos de archivo que se envían mediante sistemas locales o SaaS a Pub/Sub.

Patrones de la tarea

La aplicación contiene los siguientes patrones de tareas comunes a las canalizaciones compiladas con el SDK de Apache Beam para Java:

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Crea una cuenta de servicio de trabajador administrado por el usuario para tu canalización nueva y otórgale los roles necesarios.

    1. Para crear la cuenta de servicio, ejecuta el comando gcloud iam service-accounts create:

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Reemplaza SERVICE_ACCOUNT_ROLE por cada rol individual.

    3. Otorga a tu Cuenta de Google un rol que te permita crear tokens de acceso para la cuenta de servicio:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. Si es necesario, descarga e instala Gradle.

Crea las fuentes y los receptores de ejemplo

En esta sección, se explica cómo crear lo siguiente:

  • Un bucket de Cloud Storage para usar como ubicación de almacenamiento temporal
  • Fuentes de datos de transmisión con Pub/Sub
  • Conjuntos de datos para cargar los datos en BigQuery
  • Una instancia de Bigtable

Crea un bucket de Cloud Storage

Primero, crea un bucket de Cloud Storage. La canalización de Dataflow usa este bucket como ubicación de almacenamiento temporal.

Usa el comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Reemplaza lo siguiente:

  • BUCKET_NAME: Es un nombre para tu bucket de Cloud Storage que cumple con los requisitos de nombres de buckets. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global.
  • LOCATION: la ubicación del bucket.

Crea temas y suscripciones de Pub/Sub

Crea cuatro temas de Pub/Sub y, a continuación, crea tres suscripciones.

Para crear tus temas, ejecuta el comando gcloud pubsub topics create una vez por cada tema. Para obtener información sobre cómo asignar un nombre a una suscripción, consulta los Lineamientos para asignar un nombre a un tema o una suscripción.

gcloud pubsub topics create TOPIC_NAME

Reemplaza TOPIC_NAME por los siguientes valores y ejecuta el comando cuatro veces, una vez por cada tema:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

Para crear una suscripción a un tema, ejecuta el comando gcloud pubsub subscriptions create una vez por cada suscripción:

  1. Crea una suscripción Clickstream-inbound-sub:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Crea una suscripción Transactions-inbound-sub:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Crea una suscripción Inventory-inbound-sub:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

Crea conjuntos de datos y una tabla de BigQuery

Crea un conjunto de datos de BigQuery y una tabla particionada con el esquema adecuado para tu tema de Pub/Sub.

  1. Usa el comando bq mk para crear el primer conjunto de datos.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Crea el segundo conjunto de datos.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Usa la instrucción de SQL CREATE TABLE para crear una tabla con un esquema y datos de prueba. Los datos de prueba tienen un almacén con un valor de ID de 1. El patrón de entrada complementaria de actualización lenta usa esta tabla.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Crea una instancia y una tabla de Bigtable

Crea una instancia y una tabla de Bigtable. Para obtener más información sobre cómo crear instancias de Bigtable, consulta Crea una instancia.

  1. Si es necesario, ejecuta el siguiente comando para instalar la CLI de cbt:

    gcloud components install cbt
    
  2. Usa el comando bigtable instances create para crear una instancia:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Reemplaza CLUSTER_ZONE por la zona donde se ejecuta el clúster.

  3. Usa el comando cbt createtable para crear una tabla permanente.

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Usa el siguiente comando para agregar una familia de columnas a la tabla:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Ejecute la canalización

Usa Gradle para ejecutar una canalización de transmisión. Para ver el código Java que usa la canalización, consulta RetailDataProcessingPipeline.java.

  1. Usa el comando git clone para clonar el repositorio de GitHub:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Cambia al directorio de la aplicación:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. Para probar la canalización, en tu shell o terminal, ejecuta el siguiente comando con Gradle:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. Para ejecutar la canalización, ejecuta el siguiente comando con Gradle:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

Consulta el código fuente de la canalización en GitHub.

Crea y ejecuta trabajos de Cloud Scheduler

Crea y ejecuta tres trabajos de Cloud Scheduler: uno que publique datos de flujo de clics, uno para datos de inventario y otro para datos de transacciones. En este paso, se generan datos de muestra para la canalización.

  1. Si quieres crear un trabajo de Cloud Scheduler para este instructivo, usa el comando gcloud scheduler jobs create. En este paso, se crea un publicador para los datos de flujo de clics que publica un mensaje por minuto.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Para iniciar el trabajo de Cloud Scheduler, usa el comando gcloud scheduler jobs run.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Crea y ejecuta otro publicador similar para los datos de inventario que publique un mensaje cada dos minutos.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Inicia el segundo trabajo de Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Crea y ejecuta un tercer publicador para datos de transacciones que publique un mensaje cada dos minutos.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Inicia el tercer trabajo de Cloud Scheduler.

    gcloud scheduler jobs run --location=LOCATION transactions
    

Ve los resultados

Visualiza los datos escritos en tus tablas de BigQuery. Ejecuta las siguientes consultas para verificar los resultados en BigQuery: Mientras esta canalización se está ejecutando, puedes ver filas nuevas agregadas a las tablas de BigQuery cada minuto.

Es posible que debas esperar a que las tablas se propaguen con los datos.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto de Google Cloud que creaste para el instructivo.

  1. En la consola de Google Cloud, ve a la página Administrar recursos.

    Ir a Administrar recursos

  2. En la lista de proyectos, elige el proyecto que quieres borrar y haz clic en Borrar.
  3. En el diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra los recursos individuales

Si deseas volver a usar el proyecto, borra los recursos que creaste para el instructivo.

Borra los recursos del proyecto de Google Cloud

  1. Para borrar los trabajos de Cloud Scheduler, usa el comando gcloud scheduler jobs delete.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Para borrar las suscripciones y los temas de Pub/Sub, usa los comandos gcloud pubsub subscriptions delete y gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. Para borrar la tabla de BigQuery, usa el comando bq rm.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. Borra los conjuntos de datos de BigQuery. El conjunto de datos por sí solo no genera cargos.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Para borrar la instancia de Bigtable, usa el comando cbt deleteinstance. El bucket por sí solo no genera cargos.

    cbt deleteinstance aggregate-tables
    
  6. Para borrar el bucket de Cloud Storage, usa el comando gcloud storage rm. El bucket por sí solo no genera cargos.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revoca credenciales

  1. Revoca los roles que otorgaste a la cuenta de servicio de trabajador administrada por el usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

¿Qué sigue?