En este tutorial, crearás un flujo de procesamiento en streaming de Dataflow que transforma datos de comercio electrónico de temas y suscripciones de Pub/Sub y envía los datos a BigQuery y Bigtable. Para hacer este tutorial, necesitas Gradle.
En el tutorial se proporciona una aplicación de comercio electrónico de ejemplo integral que transmite datos de una tienda web a BigQuery y Bigtable. La aplicación de ejemplo ilustra casos prácticos habituales y prácticas recomendadas para implementar analíticas de datos de streaming e inteligencia artificial (IA) en tiempo real. En este tutorial se explica cómo responder de forma dinámica a las acciones de los clientes para analizar eventos y reaccionar a ellos en tiempo real. En este tutorial se describe cómo almacenar, analizar y visualizar datos de eventos para obtener más información valiosa sobre el comportamiento de los clientes.
La aplicación de ejemplo está disponible en GitHub. Para completar este tutorial con Terraform, sigue los pasos que se indican en la aplicación de ejemplo de GitHub.
Objetivos
- Validar los datos entrantes y aplicarles correcciones cuando sea posible.
- Analizar los datos de flujo de clics para llevar un recuento del número de vistas por producto en un periodo determinado. Almacena esta información en un almacén de baja latencia. La aplicación puede usar los datos para proporcionar mensajes a los clientes del sitio web, como el número de personas que han visto este producto.
Usar los datos de las transacciones para tomar decisiones sobre los pedidos de inventario:
- Analiza los datos de las transacciones para calcular el número total de ventas de cada artículo, tanto por tienda como a nivel global, durante un periodo determinado.
- Analice los datos de inventario para calcular el inventario entrante de cada artículo.
- Transfiere estos datos a los sistemas de inventario de forma continua para que se puedan usar en las decisiones de compra de inventario.
Validar los datos entrantes y aplicarles correcciones cuando sea posible. Escribe los datos que no se puedan corregir en una cola de mensajes fallidos para analizarlos y procesarlos más adelante. Crea una métrica que represente el porcentaje de datos entrantes que se envían a la cola de mensajes fallidos para que se puedan monitorizar y generar alertas.
Procesa todos los datos entrantes en un formato estándar y almacénalos en un almacén de datos para usarlos en futuros análisis y visualizaciones.
Desnormaliza los datos de las transacciones de las ventas en tienda para que puedan incluir información como la latitud y la longitud de la ubicación de la tienda. Proporcione la información de la tienda a través de una tabla de BigQuery que cambie lentamente, usando el ID de la tienda como clave.
Datos
La aplicación trata los siguientes tipos de datos:
- Datos de secuencias de clics que envían los sistemas online a Pub/Sub.
- Datos de transacciones que envían a Pub/Sub sistemas on‐premise o de software como servicio (SaaS).
- Datos de inventario que envían sistemas on-premise o SaaS a Pub/Sub.
Patrones de tareas
La aplicación contiene los siguientes patrones de tareas habituales en las canalizaciones creadas con el SDK de Apache Beam para Java:
- Esquemas de Apache Beam para trabajar con datos estructurados
JsonToRow
para convertir datos de JSON- El generador de código
AutoValue
para crear objetos Java antiguos (POJO) - Poner en cola datos que no se pueden procesar para su análisis
- Transformaciones de validación de datos en serie
DoFn.StartBundle
para realizar llamadas en microlotes a servicios externos- Patrones de entrada lateral
Costes
En este documento, se usan los siguientes componentes facturables de Google Cloud Platform:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Para generar una estimación de costes basada en el uso previsto,
utiliza la calculadora de precios.
Cuando termines las tareas que se describen en este documento, puedes evitar que se te siga facturando eliminando los recursos que has creado. Para obtener más información, consulta la sección Limpiar.
Antes de empezar
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Crea una cuenta de servicio de trabajador gestionada por el usuario para tu nueva canalización y asigna los roles necesarios a la cuenta de servicio.
Para crear la cuenta de servicio, ejecuta el comando
gcloud iam service-accounts create
:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Asigna roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
roles/bigquery.jobUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Sustituye
SERVICE_ACCOUNT_ROLE
por cada rol individual.Asigna a tu cuenta de Google un rol que te permita crear tokens de acceso para la cuenta de servicio:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Si es necesario, descarga e instala Gradle.
Crear las fuentes y los sumideros de ejemplo
En esta sección se explica cómo crear lo siguiente:
- Un segmento de Cloud Storage que se usará como ubicación de almacenamiento temporal
- Fuentes de datos de streaming con Pub/Sub
- Conjuntos de datos para cargar los datos en BigQuery
- Una instancia de Bigtable
Crea un segmento de Cloud Storage
Empieza creando un segmento de Cloud Storage. Este segmento se usa como ubicación de almacenamiento temporal en la canalización de Dataflow.
Usa el comando gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Haz los cambios siguientes:
- BUCKET_NAME: nombre del segmento de Cloud Storage que cumpla los requisitos de nomenclatura de segmentos. Los nombres de los segmentos de Cloud Storage deben ser únicos de forma global.
- LOCATION: la ubicación del segmento.
Crear temas y suscripciones de Pub/Sub
Crea cuatro temas de Pub/Sub y, a continuación, tres suscripciones.
Para crear tus temas, ejecuta el comando
gcloud pubsub topics create
una vez por cada tema. Para obtener información sobre cómo poner nombre a una suscripción, consulta las directrices para nombrar un tema o una suscripción.
gcloud pubsub topics create TOPIC_NAME
Sustituye TOPIC_NAME por los siguientes valores y ejecuta el comando cuatro veces, una por cada tema:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Para crear una suscripción a tu tema, ejecuta el comando
gcloud pubsub subscriptions create
una vez por cada suscripción:
Crea una
Clickstream-inbound-sub
suscripción:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Crea una
Transactions-inbound-sub
suscripción:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Crea una suscripción a
Inventory-inbound-sub
:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Crear conjuntos de datos y tablas de BigQuery
Crea un conjunto de datos de BigQuery y una tabla particionada con el esquema adecuado para tu tema de Pub/Sub.
Usa el comando
bq mk
para crear el primer conjunto de datos.bq --location=US mk \ PROJECT_ID:Retail_Store
Crea el segundo conjunto de datos.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Usa la instrucción SQL CREATE TABLE para crear una tabla con un esquema y datos de prueba. Los datos de prueba tienen una tienda con el valor de ID
1
. El patrón de entrada secundaria de actualización lenta usa esta tabla.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Crear una instancia y una tabla de Bigtable
Crea una instancia y una tabla de Bigtable. Para obtener más información sobre cómo crear instancias de Bigtable, consulta Crear una instancia.
Si es necesario, ejecuta el siguiente comando para instalar la CLI de
cbt
:gcloud components install cbt
Usa el comando
bigtable instances create
para crear una instancia:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Sustituye CLUSTER_ZONE por la zona en la que se ejecuta el clúster.
Usa el comando
cbt createtable
para crear una tabla:cbt -instance=aggregate-tables createtable PageView5MinAggregates
Usa el siguiente comando para añadir una familia de columnas a la tabla:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Ejecutar el flujo de procesamiento
Usa Gradle para ejecutar una canalización de streaming. Para ver el código de Java que usa la canalización, consulta RetailDataProcessingPipeline.java.
Usa el comando
git clone
para clonar el repositorio de GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Cambia al directorio de la aplicación:
cd dataflow-sample-applications/retail/retail-java-applications
Para probar la canalización, en tu shell o terminal, ejecuta el siguiente comando con Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Para ejecutar la canalización, ejecuta el siguiente comando con Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID \ --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
Consulta el código fuente de la canalización en GitHub.
Crear y ejecutar tareas de Cloud Scheduler
Crea y ejecuta tres tareas de Cloud Scheduler: una que publique datos de flujo de clics, otra que publique datos de inventario y otra que publique datos de transacciones. En este paso se generan datos de ejemplo para la canalización.
Para crear una tarea de Cloud Scheduler en este tutorial, usa el comando
gcloud scheduler jobs create
. En este paso se crea un editor para los datos de flujo de clics que publica un mensaje por minuto.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Para iniciar la tarea de Cloud Scheduler, usa el comando
gcloud scheduler jobs run
.gcloud scheduler jobs run --location=LOCATION clickstream
Crea y ejecuta otro editor similar para los datos de inventario que publique un mensaje cada dos minutos.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Inicia la segunda tarea de Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION inventory
Crea y ejecuta un tercer editor para los datos de transacciones que publique un mensaje cada dos minutos.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Inicia la tercera tarea de Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION transactions
Ver los resultados
Ver los datos escritos en las tablas de BigQuery. Para comprobar los resultados en BigQuery, ejecuta las siguientes consultas. Mientras se ejecuta esta canalización, puedes ver cómo se añaden nuevas filas a las tablas de BigQuery cada minuto.
Es posible que tengas que esperar a que las tablas se rellenen con datos.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Limpieza
Para evitar que los recursos utilizados en este tutorial se cobren en tu cuenta de Google Cloud, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.
Eliminar el proyecto
La forma más fácil de evitar que te cobren es eliminar el Google Cloud proyecto que has creado para el tutorial.
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Eliminar los recursos concretos
Si quieres reutilizar el proyecto, elimina los recursos que has creado para el tutorial.
Limpiar los recursos del proyecto de Google Cloud Platform
Para eliminar las tareas de Cloud Scheduler, usa el comando
gcloud scheduler jobs delete
.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Para eliminar las suscripciones y los temas de Pub/Sub, usa los comandos
gcloud pubsub subscriptions delete
ygcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Para eliminar la tabla de BigQuery, usa el comando
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
Elimina los conjuntos de datos de BigQuery. El conjunto de datos por sí solo no conlleva ningún cargo.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Para eliminar la instancia de Bigtable, usa el comando
cbt deleteinstance
. El propio contenedor no genera ningún cargo.cbt deleteinstance aggregate-tables
Para eliminar el segmento de Cloud Storage y sus objetos, usa el comando
gcloud storage rm
. El propio contenedor no genera ningún cargo.gcloud storage rm gs://BUCKET_NAME --recursive
Revocar credenciales
Revoca los roles que hayas concedido a la cuenta de servicio de trabajador gestionada por el usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de gestión de identidades y accesos:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
roles/bigquery.jobUser
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Siguientes pasos
- Consulta la aplicación de ejemplo en GitHub.
- Lee la entrada de blog relacionada Descubre patrones de Beam con el procesamiento de Clickstream de los datos de Google Tag Manager.
- Consulta información sobre cómo usar Pub/Sub para crear y usar temas y para usar suscripciones.
- Consulta información sobre cómo crear conjuntos de datos con BigQuery.
- Consulta arquitecturas de referencia, diagramas y prácticas recomendadas sobre Google Cloud. Consulta nuestro Centro de arquitectura de Cloud.