Transmite mensajes de Pub/Sub Lite con Dataflow
Como alternativa a escribir y ejecutar tus propios programas de procesamiento de datos, puedes usar Dataflow con el conector de E/S de Pub/Sub Lite para Apache Beam. Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (en tiempo real) y por lotes con la misma confiabilidad y expresividad. Ejecuta de manera confiable programas desarrollados con el SDK de Apache Beam, que tiene un conjunto extensible de abstracciones de procesamiento con estado potentes y conectores de E/S con otros sistemas de transmisión y por lotes.
En esta guía de inicio rápido, se muestra cómo escribir una canalización de Apache Beam que realizará las siguientes acciones:
- Lee mensajes de Pub/Sub Lite
- Mostrar mensajes en ventanas, o agruparlos, por marca de tiempo de publicación
- Escribir mensajes a Cloud Storage
Además, te muestra cómo hacer lo siguiente:
- Enviar tu canalización para que se ejecute en Dataflow
- Crear una plantilla flexible de Dataflow desde tu canalización
Para este instructivo, se requiere Maven, pero también es posible convertir el proyecto de ejemplo de Maven a Gradle. Para obtener más información, consulta Opcional: convierte de Maven a Gradle.
Antes de comenzar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Configura tu proyecto de Pub/Sub Lite
Crea variables para tu bucket de Cloud Storage, tu proyecto y la región de Dataflow. Los nombres de buckets de Cloud Storage deben ser únicos a nivel global. La región de Dataflow debe ser una región válida en la que puedas ejecutar el trabajo. Para obtener más información acerca de las regiones y ubicaciones, consulta Ubicaciones de Dataflow.
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
Crea un bucket de Cloud Storage que sea propiedad de este proyecto:
gcloud storage buckets create gs://$BUCKET
Crea un tema y una suscripción de zona de Pub/Sub Lite
Crea un tema y una suscripción Lite de Pub/Sub Lite zonales.
Para la ubicación de Lite, elige una ubicación de Pub/Sub Lite compatible. También debes especificar una zona para la región. Por ejemplo, us-central1-a
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Transmite mensajes a Dataflow
Descarga el código de muestra del inicio rápido
Clona el repositorio de la guía de inicio rápido y navega al directorio de código de muestra.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
Código de muestra
En este código de muestra, se usa Dataflow para realizar las siguientes acciones:
- Leer mensajes de una suscripción a Pub/Sub Lite como una fuente no delimitada.
- Agrupar mensajes según sus marcas de tiempo de publicación, con ventanas de tiempo fijo y el activador predeterminado
Escribir los mensajes agrupados en archivos en Cloud Storage.
Java
Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Java en las bibliotecas cliente de Pub/Sub Lite.
Inicia la canalización de Dataflow
Para iniciar la canalización en Dataflow, ejecuta el siguiente comando:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
El comando anterior inicia un trabajo de Dataflow. Sigue el vínculo que se muestra en el resultado de la consola para acceder al trabajo en la consola de supervisión de Dataflow.
Observa el progreso del trabajo
Observa el progreso del trabajo en la consola de Dataflow.
Abre la vista de detalles de trabajos para ver lo siguiente:
- Grafo del trabajo
- Detalles de la ejecución
- Métricas del trabajo
Publica algunos mensajes en tu tema de Lite.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
Es posible que debas esperar unos minutos para ver los mensajes en tus registros de trabajador.
Usa el siguiente comando para verificar qué archivos se escribieron en Cloud Storage.
gcloud storage ls "gs://$BUCKET/samples/"
El resultado debe tener el siguiente aspecto:
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Usa el siguiente comando para ver el contenido de un archivo:
gcloud storage cat "gs://$BUCKET/samples/your-filename"
Opcional: Crea una plantilla de Dataflow
De manera opcional, puedes crear una plantilla flexible de Dataflow basada en la canalización. Las plantillas de Dataflow te permiten ejecutar trabajos con diferentes parámetros de entrada de la consola de Google Cloud o de la línea de comandos sin la necesidad de configurar un entorno de desarrollo de Java completo.
Crea un JAR grande que incluya todas las dependencias de tu canalización. Deberías ver
target/pubsublite-streaming-bundled-1.0.jar
después de que se ejecute el comando.mvn clean package -DskipTests=true
Proporciona nombres y ubicaciones para tu archivo de plantilla y tu imagen de contenedor de plantilla.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Crea una plantilla flexible personalizada. Se proporcionó un archivo
metadata.json
obligatorio, que contiene las especificaciones necesarias para ejecutar el trabajo con el ejemplo.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Ejecuta un trabajo con la plantilla flexible personalizada.
Console
Ingresa un nombre de trabajo.
Ingresa tu región de Dataflow.
Elige tu Plantilla personalizada.
Ingresa la ruta de acceso de la plantilla.
Ingresa los parámetros obligatorios.
Haga clic en Ejecutar trabajo.
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Cloud que tiene los recursos.
En la consola de Dataflow, detén el trabajo. Cancela la canalización en lugar de desviarla.
Borra el tema y la suscripción.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Borra los archivos que se crearon con la canalización.
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
Borra la imagen de la plantilla y el archivo de la plantilla, si existen.
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Quita el bucket de Cloud Storage.
gcloud storage rm gs://$BUCKET --recursive
-
Borra la cuenta de servicio:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
¿Qué sigue?
Obtén más información para configurar plantillas de Flex de Dataflow.
Comprende las canalizaciones de transmisión de Dataflow.