En este instructivo, crearás una canalización de transmisión de Dataflow que transforma los datos de comercio electrónico de los temas y las suscripciones de Pub/Sub y envía los datos a BigQuery y Bigtable. Este instructivo requiere Gradle.
En el instructivo, se proporciona una aplicación de ejemplo de comercio electrónico de extremo a extremo que transmite datos de una tienda web a BigQuery y Bigtable. En la aplicación de muestra, se ilustran casos de uso comunes y prácticas recomendadas para implementar las estadísticas de datos de transmisión y la inteligencia artificial (IA) en tiempo real. Usa este instructivo para aprender a responder de forma dinámica a las acciones de los clientes a fin de analizar eventos y reaccionar ante ellos en tiempo real. En este instructivo, se describe cómo almacenar, analizar y visualizar datos de eventos para obtener más estadísticas sobre el comportamiento de los clientes.
La aplicación de muestra está disponible en GitHub. Para ejecutar este instructivo mediante Terraform, sigue los pasos proporcionados con la aplicación de muestra en GitHub.
Objetivos
- Validar los datos de entrada y aplicar correcciones cuando sea posible
- Analizar los datos de flujo de clics para mantener un recuento de la cantidad de vistas por producto en un período determinado. Almacena esta información en un almacén de latencia baja. La aplicación puede usar los datos para proporcionar mensajes sobre la cantidad de personas que vieron este producto a los clientes en el sitio web.
Usar los datos de transacciones para informar los pedidos de inventario:
- Analizar los datos de las transacciones para calcular la cantidad total de ventas de cada artículo, por tienda y a nivel global, durante un período determinado
- Analizar los datos de inventario a fin de calcular el inventario entrante para cada elemento
- Pasar estos datos a los sistemas de inventario de forma continua para poder usarlos en la toma de decisiones de compra de inventario.
Validar los datos de entrada y aplicar correcciones cuando sea posible Escribir los datos que no se puedan corregir en una cola de mensajes no entregados para su análisis y procesamiento adicionales. Haz una métrica que represente el porcentaje de datos entrante que se envía a la cola de mensajes no entregados disponibles para supervisar y crear alertas
Procesar todos los datos de entrada en un formato estándar y almacenarlos en un almacén de datos para usarlos en próximos análisis y visualizaciones
Desnormalizar los datos de transacciones para las ventas en la tienda a fin de que puedan incluir información como la latitud y la longitud de la ubicación de la tienda. Para proporcionar la información de la tienda mediante una tabla que cambia con lentitud en BigQuery, usa el ID de tienda como clave.
Datos
La aplicación procesa los siguientes tipos de datos:
- Datos de flujo de clics que se envían mediante sistemas en línea a Pub/Sub.
- Datos de transacciones que se enviarán mediante sistemas locales o de software como servicio (SaaS) a Pub/Sub.
- Datos de archivo que se envían mediante sistemas locales o SaaS a Pub/Sub.
Patrones de la tarea
La aplicación contiene los siguientes patrones de tareas comunes a las canalizaciones compiladas con el SDK de Apache Beam para Java:
- Esquemas de Apache Beam para trabajar con datos estructurados
JsonToRow
para convertir datos JSON- El generador de código
AutoValue
para generar objetos antiguos y sin formato basados en Java (POJOs) - Pon en cola datos que no se pueden procesar para su análisis posterior
- Transformaciones de validación de datos en serie
DoFn.StartBundle
a llamadas por microlotes a servicios externos- Patrones de entradas complementarias
Costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- BigQuery
- Bigtable
- Cloud Scheduler
- Compute Engine
- Dataflow
- Pub/Sub
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Cuando finalices las tareas que se describen en este documento, puedes borrar los recursos que creaste para evitar que continúe la facturación. Para obtener más información, consulta Cómo realizar una limpieza.
Antes de comenzar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com cloudscheduler.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Crea una cuenta de servicio de trabajador administrado por el usuario para tu canalización nueva y otórgale los roles necesarios.
Para crear la cuenta de servicio, ejecuta el comando
gcloud iam service-accounts create
:gcloud iam service-accounts create retailpipeline \ --description="Retail app data pipeline worker service account" \ --display-name="Retail app data pipeline access"
Otorga roles a la cuenta de servicio. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Reemplaza
SERVICE_ACCOUNT_ROLE
por cada rol individual.Otorga a tu Cuenta de Google un rol que te permita crear tokens de acceso para la cuenta de servicio:
gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
- Si es necesario, descarga e instala Gradle.
Crea las fuentes y los receptores de ejemplo
En esta sección, se explica cómo crear lo siguiente:
- Un bucket de Cloud Storage para usar como ubicación de almacenamiento temporal
- Fuentes de datos de transmisión con Pub/Sub
- Conjuntos de datos para cargar los datos en BigQuery
- Una instancia de Bigtable
Crea un bucket de Cloud Storage
Primero, crea un bucket de Cloud Storage. La canalización de Dataflow usa este bucket como ubicación de almacenamiento temporal.
Usa el comando gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION
Reemplaza lo siguiente:
- BUCKET_NAME: Es un nombre para tu bucket de Cloud Storage que cumple con los requisitos de nombres de buckets. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global.
- LOCATION: la ubicación del bucket.
Crea temas y suscripciones de Pub/Sub
Crea cuatro temas de Pub/Sub y, a continuación, crea tres suscripciones.
Para crear tus temas, ejecuta el comando
gcloud pubsub topics create
una vez por cada tema. Para obtener información sobre cómo asignar un nombre a una suscripción, consulta los Lineamientos para asignar un nombre a un tema o una suscripción.
gcloud pubsub topics create TOPIC_NAME
Reemplaza TOPIC_NAME por los siguientes valores y ejecuta el comando cuatro veces, una vez por cada tema:
Clickstream-inbound
Transactions-inbound
Inventory-inbound
Inventory-outbound
Para crear una suscripción a un tema, ejecuta el comando
gcloud pubsub subscriptions create
una vez por cada suscripción:
Crea una suscripción
Clickstream-inbound-sub
:gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
Crea una suscripción
Transactions-inbound-sub
:gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
Crea una suscripción
Inventory-inbound-sub
:gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
Crea conjuntos de datos y una tabla de BigQuery
Crea un conjunto de datos de BigQuery y una tabla particionada con el esquema adecuado para tu tema de Pub/Sub.
Usa el comando
bq mk
para crear el primer conjunto de datos.bq --location=US mk \ PROJECT_ID:Retail_Store
Crea el segundo conjunto de datos.
bq --location=US mk \ PROJECT_ID:Retail_Store_Aggregations
Usa la instrucción de SQL CREATE TABLE para crear una tabla con un esquema y datos de prueba. Los datos de prueba tienen un almacén con un valor de ID de
1
. El patrón de entrada complementaria de actualización lenta usa esta tabla.bq query --use_legacy_sql=false \ 'CREATE TABLE Retail_Store.Store_Locations ( id INT64, city STRING, state STRING, zip INT64 ); INSERT INTO Retail_Store.Store_Locations VALUES (1, "a_city", "a_state",00000);'
Crea una instancia y una tabla de Bigtable
Crea una instancia y una tabla de Bigtable. Para obtener más información sobre cómo crear instancias de Bigtable, consulta Crea una instancia.
Si es necesario, ejecuta el siguiente comando para instalar la CLI de
cbt
:gcloud components install cbt
Usa el comando
bigtable instances create
para crear una instancia:gcloud bigtable instances create aggregate-tables \ --display-name=aggregate-tables \ --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
Reemplaza CLUSTER_ZONE por la zona donde se ejecuta el clúster.
Usa el comando
cbt createtable
para crear una tabla permanente.cbt -instance=aggregate-tables createtable PageView5MinAggregates
Usa el siguiente comando para agregar una familia de columnas a la tabla:
cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
Ejecute la canalización
Usa Gradle para ejecutar una canalización de transmisión. Para ver el código Java que usa la canalización, consulta RetailDataProcessingPipeline.java.
Usa el comando
git clone
para clonar el repositorio de GitHub:git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
Cambia al directorio de la aplicación:
cd dataflow-sample-applications/retail/retail-java-applications
Para probar la canalización, en tu shell o terminal, ejecuta el siguiente comando con Gradle:
./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
Para ejecutar la canalización, ejecuta el siguiente comando con Gradle:
./gradlew tasks executeOnDataflow -Dexec.args=" \ --project=PROJECT_ID \ --tempLocation=gs://BUCKET_NAME/temp/ \ --runner=DataflowRunner \ --region=REGION \ --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \ --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \ --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \ --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \ --dataWarehouseOutputProject=PROJECT_ID"
Consulta el código fuente de la canalización en GitHub.
Crea y ejecuta trabajos de Cloud Scheduler
Crea y ejecuta tres trabajos de Cloud Scheduler: uno que publique datos de flujo de clics, uno para datos de inventario y otro para datos de transacciones. En este paso, se generan datos de muestra para la canalización.
Si quieres crear un trabajo de Cloud Scheduler para este instructivo, usa el comando
gcloud scheduler jobs create
. En este paso, se crea un publicador para los datos de flujo de clics que publica un mensaje por minuto.gcloud scheduler jobs create pubsub clickstream \ --schedule="* * * * *" \ --location=LOCATION \ --topic="Clickstream-inbound" \ --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
Para iniciar el trabajo de Cloud Scheduler, usa el comando
gcloud scheduler jobs run
.gcloud scheduler jobs run --location=LOCATION clickstream
Crea y ejecuta otro publicador similar para los datos de inventario que publique un mensaje cada dos minutos.
gcloud scheduler jobs create pubsub inventory \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Inventory-inbound" \ --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
Inicia el segundo trabajo de Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION inventory
Crea y ejecuta un tercer publicador para datos de transacciones que publique un mensaje cada dos minutos.
gcloud scheduler jobs create pubsub transactions \ --schedule="*/2 * * * *" \ --location=LOCATION \ --topic="Transactions-inbound" \ --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
Inicia el tercer trabajo de Cloud Scheduler.
gcloud scheduler jobs run --location=LOCATION transactions
Ve los resultados
Visualiza los datos escritos en tus tablas de BigQuery. Ejecuta las siguientes consultas para verificar los resultados en BigQuery: Mientras esta canalización se está ejecutando, puedes ver filas nuevas agregadas a las tablas de BigQuery cada minuto.
Es posible que debas esperar a que las tablas se propaguen con los datos.
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'
Libera espacio
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.
Borra el proyecto
La manera más fácil de eliminar la facturación es borrar el proyecto de Google Cloud que creaste para el instructivo.
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Borra los recursos individuales
Si deseas volver a usar el proyecto, borra los recursos que creaste para el instructivo.
Borra los recursos del proyecto de Google Cloud
Para borrar los trabajos de Cloud Scheduler, usa el comando
gcloud scheduler jobs delete
.gcloud scheduler jobs delete transactions --location=LOCATION
gcloud scheduler jobs delete inventory --location=LOCATION
gcloud scheduler jobs delete clickstream --location=LOCATION
Para borrar las suscripciones y los temas de Pub/Sub, usa los comandos
gcloud pubsub subscriptions delete
ygcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_NAME gcloud pubsub topics delete TOPIC_NAME
Para borrar la tabla de BigQuery, usa el comando
bq rm
.bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
Borra los conjuntos de datos de BigQuery. El conjunto de datos por sí solo no genera cargos.
bq rm -r -f -d PROJECT_ID:Retail_Store
bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
Para borrar la instancia de Bigtable, usa el comando
cbt deleteinstance
. El bucket por sí solo no genera cargos.cbt deleteinstance aggregate-tables
Para borrar el bucket de Cloud Storage, usa el comando
gcloud storage rm
. El bucket por sí solo no genera cargos.gcloud storage rm gs://BUCKET_NAME --recursive
Revoca credenciales
Revoca los roles que otorgaste a la cuenta de servicio de trabajador administrada por el usuario. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/pubsub.editor
roles/bigquery.dataEditor
roles/bigtable.admin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \ --role=ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
¿Qué sigue?
- Consulta la aplicación de muestra en GitHub.
- Lee la entrada de blog relacionada Obtén información sobre los patrones de Beam con el procesamiento de Clickstream de los datos de Google Tag Manager.
- Lee sobre el uso de Pub/Sub para crear y usar temas y cómo Usar suscripciones.
- Lee sobre el uso de BigQuery para crear conjuntos de datos.
- Explora arquitecturas de referencia, diagramas y prácticas recomendadas sobre Google Cloud. Consulta nuestro Cloud Architecture Center.