Transmite mensajes desde Pub/Sub con Dataflow
Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (tiempo real) y por lotes con la misma confiabilidad y expresividad. Proporciona un entorno de desarrollo de canalización simplificado con el SDK de Apache Beam, que tiene un conjunto amplio de primitivas de análisis de sesiones y sistemas de ventanas, además de un ecosistema de conectores fuente y receptores. En esta guía de inicio rápido, se muestra cómo usar Dataflow para realizar las siguientes acciones:
- Leer mensajes publicados en un tema de Pub/Sub
- Mostrar mensajes en ventanas, o agruparlos, por marca de tiempo
- Escribir mensajes a Cloud Storage
En esta guía de inicio rápido, se explica el uso de Dataflow en Java y Python. SQL también es compatible. Esta guía de inicio rápido también se ofrece como un instructivo de Google Cloud Skills Boost, que proporciona credenciales temporales para ayudarte a comenzar.
Si tu intención no es realizar un procesamiento de datos personalizado, puedes comenzar a usar las plantillas de Dataflow basadas en IU.
Antes de comenzar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Configura tu proyecto de Pub/Sub
-
Crea variables para tu bucket, tu proyecto y la región. Los nombres de depósitos de Cloud Storage deben ser únicos a nivel global. Selecciona una región de Dataflow cercana a donde ejecutas los comandos de esta guía de inicio rápido. El valor de la variable
REGION
debe ser un nombre de región válido. Para obtener más información acerca de las regiones y ubicaciones, consulta Ubicaciones de Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Crea un bucket de Cloud Storage que sea propiedad de este proyecto:
gcloud storage buckets create gs://$BUCKET_NAME
-
Crea un tema de Pub/Sub en este proyecto:
gcloud pubsub topics create $TOPIC_ID
-
Crea un trabajo de Cloud Scheduler en este proyecto. El trabajo publica un mensaje en un tema de Pub/Sub con intervalos de un minuto.
Si una app de App Engine no existe en el proyecto, crea una con el siguiente comando.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Inicia el trabajo.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Usa los siguientes comandos para clonar el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Transmite mensajes desde Pub/Sub a Cloud Storage
Muestra de código
En este código de muestra, se usa Dataflow para realizar las siguientes acciones:
- Leer mensajes de Pub/Sub
- Mostrar mensajes en ventanas, o agruparlos, en intervalos de tamaño fijo con marcas de tiempo públicas.
Escribir los mensajes en cada ventana en archivos en Cloud Storage
Java
Python
Comienza la canalización
Para iniciar la canalización, ejecuta el siguiente comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
El comando anterior se ejecuta de manera local y, luego, inicia un trabajo de Dataflow que se ejecuta en la nube. Cuando el comando muestre JOB_MESSAGE_DETAILED: Workers
have started successfully
, sal del programa local con Ctrl+C
.
Observa el progreso del trabajo y la canalización
Puedes observar el progreso del trabajo en la consola de Dataflow.
Abre la vista de detalles de trabajos para ver lo siguiente:
- Estructura del trabajo
- Registros del trabajo
- Métricas de etapas
Puede que debas esperar unos minutos para ver los archivos de salida en Cloud Storage.
También puedes usar la línea de comandos que se muestra a continuación para verificar qué archivos se escribieron.
gcloud storage ls gs://${BUCKET_NAME}/samples/
El resultado debe tener el siguiente aspecto:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Cloud que tiene los recursos.
Borra el trabajo de Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
En la consola de Dataflow, detén el trabajo. Cancela la canalización sin desviarla.
Borra el tema.
gcloud pubsub topics delete $TOPIC_ID
Borra los archivos que se crearon con la canalización.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Quita el bucket de Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Delete the service account:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
¿Qué sigue?
Si deseas mostrar los mensajes de Pub/Sub en una marca de tiempo personalizada, puedes especificar la marca de tiempo como un atributo en el mensaje de Pub/Sub y, luego, usar la marca de tiempo personalizada con el comando
withTimestampAttribute
de PubsubIO.Observa las plantillas de código abierto de Dataflow diseñadas para la transmisión de Google.
Obtén más información sobre cómo Dataflow se integra com Pub/Sub.
Consulta este instructivo que lee desde Pub/Sub y escribe en BigQuery mediante las plantillas flexibles de Dataflow.
Para obtener más información sobre el sistema de ventanas, consulta el ejemplo en la página sobre canalización de videojuegos para dispositivos móviles de Apache Beam.