Transmitir mensajes de Pub/Sub mediante Dataflow y Cloud Storage
Dataflow es un servicio totalmente gestionado que está diseñado para transformar y enriquecer datos tanto en streaming (en tiempo real) como por lotes con la misma fiabilidad y expresividad. Proporciona un entorno de desarrollo de canalizaciones simplificado que usa el SDK de Apache Beam, que tiene un amplio conjunto de tipos primitivos de análisis de ventanas y sesiones, así como un ecosistema de conectores de origen y de receptor. En esta guía de inicio rápido se muestra cómo usar Dataflow para hacer lo siguiente:
- Leer mensajes publicados en un tema de Pub/Sub
- Ventana (o agrupa) los mensajes por marca de tiempo.
- Escribir los mensajes en Cloud Storage
En esta guía de inicio rápido se explica cómo usar Dataflow en Java y Python. También se admite SQL. Esta guía de inicio rápido también se ofrece como un tutorial de Acelerador de conocimientos de Google Cloud, que proporciona credenciales temporales para que puedas empezar.
También puedes empezar usando plantillas de Dataflow basadas en la interfaz de usuario si no tienes intención de hacer un tratamiento de datos personalizado.
Antes de empezar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator
). Learn how to grant roles. -
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
Ensure that you have the Create Service Accounts IAM role
(
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Ensure that you have the Create Service Accounts IAM role
(
roles/iam.serviceAccountCreator
). Learn how to grant roles. -
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
Ensure that you have the Create Service Accounts IAM role
(
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Configurar el proyecto de Pub/Sub
-
Crea variables para tu segmento, proyecto y región. Los nombres de los segmentos de Cloud Storage deben ser únicos de forma global. Selecciona una región de Dataflow cercana a la ubicación en la que ejecutas los comandos de esta guía de inicio rápido. El valor de la variable
REGION
debe ser un nombre de región válido. Para obtener más información sobre las regiones y las ubicaciones, consulta el artículo sobre las ubicaciones de Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crea un segmento de Cloud Storage propiedad de este proyecto:
gcloud storage buckets create gs://$BUCKET_NAME
-
Crea un tema de Pub/Sub en este proyecto:
gcloud pubsub topics create $TOPIC_ID
-
Crea una tarea de Cloud Scheduler en este proyecto. La tarea publica un mensaje en un tema de Pub/Sub a intervalos de un minuto.
Si no existe ninguna aplicación de App Engine para el proyecto, en este paso se creará una.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Inicia el trabajo.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Usa los siguientes comandos para clonar el repositorio de inicio rápido y ve al directorio del código de ejemplo:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Enviar mensajes de Pub/Sub a Cloud Storage
Código de ejemplo
Este código de ejemplo usa Dataflow para hacer lo siguiente:
- Leer mensajes de Pub/Sub.
- Ventana (o agrupa) los mensajes en intervalos de tamaño fijo por marcas de tiempo de publicación.
Escribe los mensajes de cada ventana en archivos de Cloud Storage.
Java
Python
Iniciar el flujo de procesamiento
Para iniciar la canalización, ejecuta el siguiente comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
El comando anterior se ejecuta de forma local e inicia una tarea de Dataflow que se ejecuta en la nube. Cuando el comando devuelva JOB_MESSAGE_DETAILED: Workers
have started successfully
, sal del programa local con Ctrl+C
.
Observar el progreso de las tareas y las canalizaciones
Puedes observar el progreso de la tarea en la consola de Dataflow.
Abre la vista de detalles del trabajo para ver lo siguiente:
- Estructura de la tarea
- Registros de tareas
- Métricas de etapa
Es posible que tengas que esperar unos minutos para ver los archivos de salida en Cloud Storage.
También puedes usar la línea de comandos que aparece a continuación para comprobar qué archivos se han escrito.
gcloud storage ls gs://${BUCKET_NAME}/samples/
La salida debería tener este aspecto:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Limpieza
Para evitar que se apliquen cargos en tu Google Cloud cuenta por los recursos utilizados en esta página, elimina el Google Cloud proyecto con los recursos.
Elimina la tarea de Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
En la consola de Dataflow, detén el trabajo. Cancela la canalización sin vaciarla.
Elimina el tema.
gcloud pubsub topics delete $TOPIC_ID
Elimina los archivos creados por la canalización.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Elimina el segmento de Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Elimina la cuenta de servicio:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Siguientes pasos
Si quieres visualizar los mensajes de Pub/Sub en función de una marca de tiempo personalizada, puedes especificarla como atributo en el mensaje de Pub/Sub y, a continuación, usarla con
withTimestampAttribute
de PubsubIO.Consulta las plantillas de Dataflow de código abierto de Google diseñadas para el streaming.
Consulta más información sobre cómo se integra Dataflow con Pub/Sub.
Consulta este tutorial para leer datos de Pub/Sub y escribirlos en BigQuery con plantillas flexibles de Dataflow.
Para obtener más información sobre las ventanas, consulta el ejemplo de pipeline de juegos para móviles de Apache Beam.