En este tutorial se usa la plantilla Suscripción de Pub/Sub a BigQuery para crear y ejecutar una tarea de plantilla de Dataflow mediante la consola de Google Cloud Google Cloud o la CLI de Google Cloud. En este tutorial se explica un ejemplo de flujo de procesamiento en streaming que lee mensajes codificados en JSON de Pub/Sub y los escribe en una tabla de BigQuery.
Las analíticas en tiempo real y los flujos de procesamiento de integración de datos usan Pub/Sub para ingerir y distribuir datos. Pub/Sub te permite crear sistemas de productores y consumidores de eventos, llamados editores y suscriptores. Los editores envían eventos al servicio Pub/Sub de forma asíncrona, y Pub/Sub envía los eventos a todos los servicios que deben reaccionar a ellos.
Dataflow es un servicio totalmente gestionado que está diseñado para transformar y enriquecer datos tanto en streaming (en tiempo real) como por lotes. Proporciona un entorno de desarrollo de flujos de trabajo simplificado que usa el SDK de Apache Beam para transformar los datos entrantes y, a continuación, generar los datos transformados.
La ventaja de este flujo de trabajo es que puedes usar UDFs para transformar los datos de los mensajes antes de escribirlos en BigQuery.
Antes de ejecutar una canalización de Dataflow para este caso, plantéate si una suscripción de Pub/Sub a BigQuery con una UDF se ajusta a tus necesidades.
Objetivos
- Crea un tema de Pub/Sub.
- Crea un conjunto de datos de BigQuery con una tabla y un esquema.
- Usa una plantilla de streaming proporcionada por Google para transmitir datos de tu suscripción de Pub/Sub a BigQuery mediante Dataflow.
Costes
En este documento, se usan los siguientes componentes facturables de Google Cloud Platform:
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
Para generar una estimación de costes basada en el uso previsto,
utiliza la calculadora de precios.
Cuando termines las tareas que se describen en este documento, puedes evitar que se te siga facturando eliminando los recursos que has creado. Para obtener más información, consulta la sección Limpiar.
Antes de empezar
En esta sección se explica cómo seleccionar un proyecto, habilitar APIs y asignar los roles adecuados a tu cuenta de usuario y a la cuenta de servicio de trabajador.
Consola
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. Para completar los pasos de este tutorial, tu cuenta de usuario debe tener el rol Usuario de cuenta de servicio. La cuenta de servicio predeterminada de Compute Engine debe tener los siguientes roles: Trabajador de Dataflow, Administrador de Dataflow, Editor de Pub/Sub, Administrador de objetos de Storage y Editor de datos de BigQuery. Para añadir los roles necesarios en la Google Cloud consola, sigue estos pasos:
En la consola, ve a la página Gestión de identidades y accesos. Google Cloud
Ir a IAM- Selecciona el proyecto.
- En la fila que contiene tu cuenta de usuario, haz clic en Editar principal y, a continuación, en Añadir otro rol.
- En la lista desplegable, selecciona el rol Usuario de cuenta de servicio.
- En la fila que contiene la cuenta de servicio predeterminada de Compute Engine, haz clic en Editar principal y, a continuación, en Añadir otro rol.
- En la lista desplegable, selecciona el rol Dataflow Worker.
Repite el proceso con los roles Administrador de Dataflow, Editor de Pub/Sub, Administrador de objetos de Storage y Editor de datos de BigQuery. A continuación, haz clic en Guardar.
Para obtener más información sobre cómo conceder roles, consulta Conceder un rol de gestión de identidades y accesos mediante la consola.
gcloud
- Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. -
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. -
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Concede roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez por cada uno de los siguientes roles de gestión de identidades y accesos:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Haz los cambios siguientes:
PROJECT_ID
: tu ID de proyecto.PROJECT_NUMBER
: tu número de proyecto. Para encontrar el número de tu proyecto, usa el comandogcloud projects describe
.SERVICE_ACCOUNT_ROLE
: cada rol individual.
Crea un segmento de Cloud Storage
Empieza creando un segmento de Cloud Storage con la Google Cloud consola o la CLI de Google Cloud. El flujo de procesamiento de Dataflow usa este segmento como ubicación de almacenamiento temporal.
Consola
En la Google Cloud consola, ve a la página Segmentos de Cloud Storage.
Haz clic en Crear.
En la página Crear un segmento, en Nombre del segmento, escribe un nombre que cumpla los requisitos de nombres de segmentos. Los nombres de los segmentos de Cloud Storage deben ser únicos de forma global. No selecciones las otras opciones.
Haz clic en Crear.
gcloud
Usa el comando gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME
Sustituye BUCKET_NAME
por el nombre que quieras darle a tu segmento de Cloud Storage, que debe cumplir los requisitos de nomenclatura de segmentos.
Los nombres de los segmentos de Cloud Storage deben ser únicos de forma global.
Crear un tema y una suscripción de Pub/Sub
Crea un tema de Pub/Sub y, a continuación, crea una suscripción a ese tema.
Consola
Para crear un tema, sigue estos pasos:
En la Google Cloud consola, ve a la página Temas de Pub/Sub.
Haz clic en Crear tema.
En el campo ID de tema, introduce un ID para el tema. Para obtener información sobre cómo nombrar un tema, consulta las directrices para nombrar un tema o una suscripción.
Mantén la opción Añadir una suscripción predeterminada. No selecciones las otras opciones.
Haz clic en Crear.
- En la página de detalles del tema, el nombre de la suscripción que se ha creado se muestra en ID de suscripción. Anota este valor para los pasos posteriores.
gcloud
Para crear un tema, ejecuta el comando
gcloud pubsub topics create
. Para obtener información sobre cómo poner nombre a una suscripción, consulta las directrices para nombrar un tema o una suscripción.
gcloud pubsub topics create TOPIC_ID
Sustituye TOPIC_ID
por el nombre del tema de Pub/Sub.
Para crear una suscripción a tu tema, ejecuta el comando
gcloud pubsub subscriptions create
:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
Sustituye SUBSCRIPTION_ID
por el nombre de tu suscripción de Pub/Sub.
Crear una tabla de BigQuery
En este paso, creará una tabla de BigQuery con el siguiente esquema:
Nombre de la columna | Tipo de datos |
---|---|
name |
STRING |
customer_id |
INTEGER |
Si aún no tienes un conjunto de datos de BigQuery, crea uno. Para obtener más información, consulta Crear conjuntos de datos. A continuación, crea una tabla vacía:
Consola
Ve a la página BigQuery.
En el panel Explorador, expande tu proyecto y selecciona un conjunto de datos.
En la sección de información Conjunto de datos, haga clic en
Crear tabla.En la lista Crear tabla a partir de, selecciona Tabla vacía.
En el cuadro Tabla, introduce el nombre de la tabla.
En la sección Esquema, haz clic en Editar como texto.
Pega la siguiente definición de esquema:
name:STRING, customer_id:INTEGER
Haz clic en Crear tabla.
gcloud
Usa el comando bq mk
.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Haz los cambios siguientes:
PROJECT_ID
: tu ID de proyectoDATASET_NAME
: el nombre del conjunto de datosTABLE_NAME
: el nombre de la tabla que se va a crear
Ejecutar el flujo de procesamiento
Ejecuta un flujo de procesamiento en streaming con la plantilla de suscripción de Pub/Sub a BigQuery proporcionada por Google. La canalización obtiene los datos entrantes del tema de Pub/Sub y los envía a tu conjunto de datos de BigQuery.
Consola
En la Google Cloud consola, ve a la página Trabajos de Dataflow.
Haz clic en Crear tarea a partir de plantilla.
Escriba un nombre de tarea para su tarea de Dataflow.
En Punto de conexión regional, selecciona una región para tu tarea de Dataflow.
En Plantilla de Dataflow, selecciona la plantilla Suscripción de Pub/Sub a BigQuery.
En Tabla de salida de BigQuery, seleccione Buscar y elija la tabla de BigQuery.
En la lista Suscripción de entrada de Pub/Sub, selecciona la suscripción de Pub/Sub.
En Ubicación temporal, introduce lo siguiente:
gs://BUCKET_NAME/temp/
Sustituye
BUCKET_NAME
por el nombre de tu segmento de Cloud Storage. La carpetatemp
almacena archivos temporales de las tareas de Dataflow.Haz clic en Ejecutar trabajo.
gcloud
Para ejecutar la plantilla en tu shell o terminal, usa el comando
gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
Sustituye las siguientes variables:
JOB_NAME
. un nombre para el trabajoDATAFLOW_REGION
: una región para el trabajoPROJECT_ID
: el nombre de tu proyecto de Google Cloud PlatformSUBSCRIPTION_ID
: el nombre de tu suscripción de Pub/SubDATASET_NAME
: el nombre de tu conjunto de datos de BigQueryTABLE_NAME
: el nombre de tu tabla de BigQuery
Publicar mensajes en Pub/Sub
Una vez que se inicia la tarea de Dataflow, puedes publicar mensajes en Pub/Sub y la canalización los escribirá en BigQuery.
Consola
En la consola, ve a Pub/Sub > Temas. Google Cloud
En la lista de temas, haz clic en el nombre del tema.
Haz clic en Mensajes.
Haz clic en Publicar mensajes.
En Número de mensajes, introduce
10
.En Cuerpo del mensaje, escribe
{"name": "Alice", "customer_id": 1}
.Haz clic en Publicar.
gcloud
Para publicar mensajes en tu tema, usa el comando
gcloud pubsub topics publish
.
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
Sustituye TOPIC_ID
por el nombre del tema.
Ver los resultados
Consulta los datos escritos en tu tabla de BigQuery. Los datos pueden tardar hasta un minuto en empezar a aparecer en la tabla.
Consola
En la Google Cloud consola, ve a la página BigQuery.
Ve a la página de BigQueryEn el editor de consultas, ejecuta la siguiente consulta:
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
Sustituye las siguientes variables:
PROJECT_ID
: el nombre de tu proyecto de Google Cloud PlatformDATASET_NAME
: el nombre de tu conjunto de datos de BigQueryTABLE_NAME
: el nombre de tu tabla de BigQuery
gcloud
Para comprobar los resultados en BigQuery, ejecuta la siguiente consulta:
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
Sustituye las siguientes variables:
PROJECT_ID
: el nombre de tu proyecto de Google Cloud PlatformDATASET_NAME
: el nombre de tu conjunto de datos de BigQueryTABLE_NAME
: el nombre de tu tabla de BigQuery
Usar una función definida por el usuario para transformar los datos
En este tutorial se da por hecho que los mensajes de Pub/Sub tienen formato JSON y que el esquema de la tabla de BigQuery coincide con los datos JSON.
También puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos antes de escribirlos en BigQuery. La UDF puede realizar un procesamiento adicional, como filtrar, eliminar información personal identificable (IPI) o enriquecer los datos con campos adicionales.
Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.
Usar una tabla de mensajes fallidos
Mientras se ejecuta la tarea, es posible que la canalización no pueda escribir mensajes individuales en BigQuery. Entre los posibles errores se incluyen los siguientes:
- Errores de serialización, incluido JSON con formato incorrecto.
- Errores de conversión de tipos, causados por una discrepancia entre el esquema de la tabla y los datos JSON.
- Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla.
La canalización escribe estos errores en una tabla de mensajes fallidos de BigQuery. De forma predeterminada, la canalización crea automáticamente una tabla de mensajes fallidos llamada TABLE_NAME_error_records
, donde TABLE_NAME
es el nombre de la tabla de salida.
Para usar otro nombre, define el parámetro de plantilla outputDeadletterTable
.
Limpieza
Para evitar que los recursos utilizados en este tutorial se cobren en tu cuenta de Google Cloud, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.
Eliminar el proyecto
La forma más fácil de evitar que te cobren es eliminar el Google Cloud proyecto que has creado para el tutorial.
Consola
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Eliminar los recursos concretos
Si quieres reutilizar el proyecto más adelante, puedes conservarlo, pero elimina los recursos que hayas creado durante el tutorial.
Detener el flujo de procesamiento de Dataflow
Consola
En la Google Cloud consola, ve a la página Trabajos de Dataflow.
Haz clic en el trabajo que quieras detener.
Para detener una tarea, su estado debe ser En curso.
En la página de detalles del trabajo, haz clic en Detener.
Haz clic en Cancelar.
Para confirmar tu elección, haz clic en Detener trabajo.
gcloud
Para cancelar tu tarea de Dataflow, usa el comando gcloud dataflow jobs
.
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
Limpiar los recursos del proyecto de Google Cloud Platform
Consola
Elimina el tema y la suscripción de Pub/Sub.
Ve a la página Temas de Pub/Sub en la consola de Google Cloud .
Selecciona el tema que has creado.
Haz clic en Eliminar para eliminar el tema de forma permanente.
Ve a la página Suscripciones de Pub/Sub en la Google Cloud consola.
Selecciona la suscripción creada con tu tema.
Haz clic en Eliminar para eliminar la suscripción de forma permanente.
Elimina la tabla y el conjunto de datos de BigQuery.
En la Google Cloud consola, ve a la página BigQuery.
En el panel Explorador, expande tu proyecto.
Junto al conjunto de datos que quieras eliminar, haz clic en
Ver acciones y, a continuación, en Eliminar.
Elimina el segmento de Cloud Storage.
En la Google Cloud consola, ve a la página Segmentos de Cloud Storage.
Seleccione el contenedor que quiera eliminar, haga clic en
Eliminar y, a continuación, siga las instrucciones.
gcloud
Para eliminar la suscripción y el tema de Pub/Sub, usa los comandos
gcloud pubsub subscriptions delete
ygcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
Para eliminar la tabla de BigQuery, usa el comando
bq rm
.bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
Elimina el conjunto de datos de BigQuery. El conjunto de datos por sí solo no conlleva ningún cargo.
bq rm -r -f -d PROJECT_ID:tutorial_dataset
Para eliminar el segmento de Cloud Storage y sus objetos, usa el comando
gcloud storage rm
. El contenedor por sí solo no conlleva ningún cargo.gcloud storage rm gs://BUCKET_NAME --recursive
Revocar credenciales
Consola
Si conservas el proyecto, revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine.
- En la consola, ve a la página Gestión de identidades y accesos. Google Cloud
Selecciona un proyecto, una carpeta o una organización.
Busca la fila que contenga la cuenta principal cuyo acceso quieras revocar. En esa fila, haz clic en
Editar principal.Haga clic en el botón Eliminar
de cada rol que quiera revocar y, a continuación, haga clic en Guardar.
gcloud
- Si conservas el proyecto, revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Siguientes pasos
- Amplía tu plantilla de Dataflow con UDFs.
- Consulta más información sobre cómo usar las plantillas de Dataflow.
- Consulta todas las plantillas proporcionadas por Google.
- Consulta información sobre cómo usar Pub/Sub para crear y usar temas y cómo crear una suscripción de extracción.
- Consulta información sobre cómo crear conjuntos de datos con BigQuery.
- Consulta información sobre las suscripciones de Pub/Sub.
- Consulta arquitecturas de referencia, diagramas y prácticas recomendadas sobre Google Cloud. Consulta nuestro Centro de arquitectura de Cloud.