Transmitir datos de Pub/Sub a BigQuery


En este tutorial se usa la plantilla Suscripción de Pub/Sub a BigQuery para crear y ejecutar una tarea de plantilla de Dataflow mediante la consola de Google Cloud Google Cloud o la CLI de Google Cloud. En este tutorial se explica un ejemplo de flujo de procesamiento en streaming que lee mensajes codificados en JSON de Pub/Sub y los escribe en una tabla de BigQuery.

Las analíticas en tiempo real y los flujos de procesamiento de integración de datos usan Pub/Sub para ingerir y distribuir datos. Pub/Sub te permite crear sistemas de productores y consumidores de eventos, llamados editores y suscriptores. Los editores envían eventos al servicio Pub/Sub de forma asíncrona, y Pub/Sub envía los eventos a todos los servicios que deben reaccionar a ellos.

Dataflow es un servicio totalmente gestionado que está diseñado para transformar y enriquecer datos tanto en streaming (en tiempo real) como por lotes. Proporciona un entorno de desarrollo de flujos de trabajo simplificado que usa el SDK de Apache Beam para transformar los datos entrantes y, a continuación, generar los datos transformados.

La ventaja de este flujo de trabajo es que puedes usar UDFs para transformar los datos de los mensajes antes de escribirlos en BigQuery.

Antes de ejecutar una canalización de Dataflow para este caso, plantéate si una suscripción de Pub/Sub a BigQuery con una UDF se ajusta a tus necesidades.

Objetivos

  • Crea un tema de Pub/Sub.
  • Crea un conjunto de datos de BigQuery con una tabla y un esquema.
  • Usa una plantilla de streaming proporcionada por Google para transmitir datos de tu suscripción de Pub/Sub a BigQuery mediante Dataflow.

Costes

En este documento, se usan los siguientes componentes facturables de Google Cloud Platform:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos Google Cloud pueden disfrutar de una prueba gratuita.

Cuando termines las tareas que se describen en este documento, puedes evitar que se te siga facturando eliminando los recursos que has creado. Para obtener más información, consulta la sección Limpiar.

Antes de empezar

En esta sección se explica cómo seleccionar un proyecto, habilitar APIs y asignar los roles adecuados a tu cuenta de usuario y a la cuenta de servicio de trabajador.

Consola

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  8. Para completar los pasos de este tutorial, tu cuenta de usuario debe tener el rol Usuario de cuenta de servicio. La cuenta de servicio predeterminada de Compute Engine debe tener los siguientes roles: Trabajador de Dataflow, Administrador de Dataflow, Editor de Pub/Sub, Administrador de objetos de Storage y Editor de datos de BigQuery. Para añadir los roles necesarios en la Google Cloud consola, sigue estos pasos:

    1. En la consola, ve a la página Gestión de identidades y accesos. Google Cloud

      Ir a IAM
    2. Selecciona el proyecto.
    3. En la fila que contiene tu cuenta de usuario, haz clic en Editar principal y, a continuación, en Añadir otro rol.
    4. En la lista desplegable, selecciona el rol Usuario de cuenta de servicio.
    5. En la fila que contiene la cuenta de servicio predeterminada de Compute Engine, haz clic en Editar principal y, a continuación, en Añadir otro rol.
    6. En la lista desplegable, selecciona el rol Dataflow Worker.
    7. Repite el proceso con los roles Administrador de Dataflow, Editor de Pub/Sub, Administrador de objetos de Storage y Editor de datos de BigQuery. A continuación, haz clic en Guardar.

      Para obtener más información sobre cómo conceder roles, consulta Conceder un rol de gestión de identidades y accesos mediante la consola.

gcloud

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  7. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  13. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  14. Concede roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez por cada uno de los siguientes roles de gestión de identidades y accesos:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Haz los cambios siguientes:

    • PROJECT_ID: tu ID de proyecto.
    • PROJECT_NUMBER: tu número de proyecto. Para encontrar el número de tu proyecto, usa el comando gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: cada rol individual.

Crea un segmento de Cloud Storage

Empieza creando un segmento de Cloud Storage con la Google Cloud consola o la CLI de Google Cloud. El flujo de procesamiento de Dataflow usa este segmento como ubicación de almacenamiento temporal.

Consola

  1. En la Google Cloud consola, ve a la página Segmentos de Cloud Storage.

    Ir a Contenedores

  2. Haz clic en Crear.

  3. En la página Crear un segmento, en Nombre del segmento, escribe un nombre que cumpla los requisitos de nombres de segmentos. Los nombres de los segmentos de Cloud Storage deben ser únicos de forma global. No selecciones las otras opciones.

  4. Haz clic en Crear.

gcloud

Usa el comando gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME

Sustituye BUCKET_NAME por el nombre que quieras darle a tu segmento de Cloud Storage, que debe cumplir los requisitos de nomenclatura de segmentos. Los nombres de los segmentos de Cloud Storage deben ser únicos de forma global.

Crear un tema y una suscripción de Pub/Sub

Crea un tema de Pub/Sub y, a continuación, crea una suscripción a ese tema.

Consola

Para crear un tema, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Temas de Pub/Sub.

    Ir a Temas

  2. Haz clic en Crear tema.

  3. En el campo ID de tema, introduce un ID para el tema. Para obtener información sobre cómo nombrar un tema, consulta las directrices para nombrar un tema o una suscripción.

  4. Mantén la opción Añadir una suscripción predeterminada. No selecciones las otras opciones.

  5. Haz clic en Crear.

  6. En la página de detalles del tema, el nombre de la suscripción que se ha creado se muestra en ID de suscripción. Anota este valor para los pasos posteriores.

gcloud

Para crear un tema, ejecuta el comando gcloud pubsub topics create. Para obtener información sobre cómo poner nombre a una suscripción, consulta las directrices para nombrar un tema o una suscripción.

gcloud pubsub topics create TOPIC_ID

Sustituye TOPIC_ID por el nombre del tema de Pub/Sub.

Para crear una suscripción a tu tema, ejecuta el comando gcloud pubsub subscriptions create:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Sustituye SUBSCRIPTION_ID por el nombre de tu suscripción de Pub/Sub.

Crear una tabla de BigQuery

En este paso, creará una tabla de BigQuery con el siguiente esquema:

Nombre de la columna Tipo de datos
name STRING
customer_id INTEGER

Si aún no tienes un conjunto de datos de BigQuery, crea uno. Para obtener más información, consulta Crear conjuntos de datos. A continuación, crea una tabla vacía:

Consola

  1. Ve a la página BigQuery.

    Ir a BigQuery

  2. En el panel Explorador, expande tu proyecto y selecciona un conjunto de datos.

  3. En la sección de información Conjunto de datos, haga clic en Crear tabla.

  4. En la lista Crear tabla a partir de, selecciona Tabla vacía.

  5. En el cuadro Tabla, introduce el nombre de la tabla.

  6. En la sección Esquema, haz clic en Editar como texto.

  7. Pega la siguiente definición de esquema:

    name:STRING,
    customer_id:INTEGER
    
  8. Haz clic en Crear tabla.

gcloud

Usa el comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Haz los cambios siguientes:

  • PROJECT_ID: tu ID de proyecto
  • DATASET_NAME: el nombre del conjunto de datos
  • TABLE_NAME: el nombre de la tabla que se va a crear

Ejecutar el flujo de procesamiento

Ejecuta un flujo de procesamiento en streaming con la plantilla de suscripción de Pub/Sub a BigQuery proporcionada por Google. La canalización obtiene los datos entrantes del tema de Pub/Sub y los envía a tu conjunto de datos de BigQuery.

Consola

  1. En la Google Cloud consola, ve a la página Trabajos de Dataflow.

    Ir a Tareas

  2. Haz clic en Crear tarea a partir de plantilla.

  3. Escriba un nombre de tarea para su tarea de Dataflow.

  4. En Punto de conexión regional, selecciona una región para tu tarea de Dataflow.

  5. En Plantilla de Dataflow, selecciona la plantilla Suscripción de Pub/Sub a BigQuery.

  6. En Tabla de salida de BigQuery, seleccione Buscar y elija la tabla de BigQuery.

  7. En la lista Suscripción de entrada de Pub/Sub, selecciona la suscripción de Pub/Sub.

  8. En Ubicación temporal, introduce lo siguiente:

    gs://BUCKET_NAME/temp/
    

    Sustituye BUCKET_NAME por el nombre de tu segmento de Cloud Storage. La carpeta temp almacena archivos temporales de las tareas de Dataflow.

  9. Haz clic en Ejecutar trabajo.

gcloud

Para ejecutar la plantilla en tu shell o terminal, usa el comando gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

Sustituye las siguientes variables:

  • JOB_NAME. un nombre para el trabajo
  • DATAFLOW_REGION: una región para el trabajo
  • PROJECT_ID: el nombre de tu proyecto de Google Cloud Platform
  • SUBSCRIPTION_ID: el nombre de tu suscripción de Pub/Sub
  • DATASET_NAME: el nombre de tu conjunto de datos de BigQuery
  • TABLE_NAME: el nombre de tu tabla de BigQuery

Publicar mensajes en Pub/Sub

Una vez que se inicia la tarea de Dataflow, puedes publicar mensajes en Pub/Sub y la canalización los escribirá en BigQuery.

Consola

  1. En la consola, ve a Pub/Sub > Temas. Google Cloud

    Ir a Temas

  2. En la lista de temas, haz clic en el nombre del tema.

  3. Haz clic en Mensajes.

  4. Haz clic en Publicar mensajes.

  5. En Número de mensajes, introduce 10.

  6. En Cuerpo del mensaje, escribe {"name": "Alice", "customer_id": 1}.

  7. Haz clic en Publicar.

gcloud

Para publicar mensajes en tu tema, usa el comando gcloud pubsub topics publish.

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

Sustituye TOPIC_ID por el nombre del tema.

Ver los resultados

Consulta los datos escritos en tu tabla de BigQuery. Los datos pueden tardar hasta un minuto en empezar a aparecer en la tabla.

Consola

  1. En la Google Cloud consola, ve a la página BigQuery.
    Ve a la página de BigQuery

  2. En el editor de consultas, ejecuta la siguiente consulta:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    Sustituye las siguientes variables:

    • PROJECT_ID: el nombre de tu proyecto de Google Cloud Platform
    • DATASET_NAME: el nombre de tu conjunto de datos de BigQuery
    • TABLE_NAME: el nombre de tu tabla de BigQuery

gcloud

Para comprobar los resultados en BigQuery, ejecuta la siguiente consulta:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

Sustituye las siguientes variables:

  • PROJECT_ID: el nombre de tu proyecto de Google Cloud Platform
  • DATASET_NAME: el nombre de tu conjunto de datos de BigQuery
  • TABLE_NAME: el nombre de tu tabla de BigQuery

Usar una función definida por el usuario para transformar los datos

En este tutorial se da por hecho que los mensajes de Pub/Sub tienen formato JSON y que el esquema de la tabla de BigQuery coincide con los datos JSON.

También puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos antes de escribirlos en BigQuery. La UDF puede realizar un procesamiento adicional, como filtrar, eliminar información personal identificable (IPI) o enriquecer los datos con campos adicionales.

Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Usar una tabla de mensajes fallidos

Mientras se ejecuta la tarea, es posible que la canalización no pueda escribir mensajes individuales en BigQuery. Entre los posibles errores se incluyen los siguientes:

  • Errores de serialización, incluido JSON con formato incorrecto.
  • Errores de conversión de tipos, causados por una discrepancia entre el esquema de la tabla y los datos JSON.
  • Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla.

La canalización escribe estos errores en una tabla de mensajes fallidos de BigQuery. De forma predeterminada, la canalización crea automáticamente una tabla de mensajes fallidos llamada TABLE_NAME_error_records, donde TABLE_NAME es el nombre de la tabla de salida. Para usar otro nombre, define el parámetro de plantilla outputDeadletterTable.

Limpieza

Para evitar que los recursos utilizados en este tutorial se cobren en tu cuenta de Google Cloud, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.

Eliminar el proyecto

La forma más fácil de evitar que te cobren es eliminar el Google Cloud proyecto que has creado para el tutorial.

Consola

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Eliminar los recursos concretos

Si quieres reutilizar el proyecto más adelante, puedes conservarlo, pero elimina los recursos que hayas creado durante el tutorial.

Detener el flujo de procesamiento de Dataflow

Consola

  1. En la Google Cloud consola, ve a la página Trabajos de Dataflow.

    Ir a Tareas

  2. Haz clic en el trabajo que quieras detener.

    Para detener una tarea, su estado debe ser En curso.

  3. En la página de detalles del trabajo, haz clic en Detener.

  4. Haz clic en Cancelar.

  5. Para confirmar tu elección, haz clic en Detener trabajo.

gcloud

Para cancelar tu tarea de Dataflow, usa el comando gcloud dataflow jobs.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Limpiar los recursos del proyecto de Google Cloud Platform

Consola

  1. Elimina el tema y la suscripción de Pub/Sub.

    1. Ve a la página Temas de Pub/Sub en la consola de Google Cloud .

      Ir a Temas

    2. Selecciona el tema que has creado.

    3. Haz clic en Eliminar para eliminar el tema de forma permanente.

    4. Ve a la página Suscripciones de Pub/Sub en la Google Cloud consola.

      Ir a Suscripciones

    5. Selecciona la suscripción creada con tu tema.

    6. Haz clic en Eliminar para eliminar la suscripción de forma permanente.

  2. Elimina la tabla y el conjunto de datos de BigQuery.

    1. En la Google Cloud consola, ve a la página BigQuery.

      Ir a BigQuery

    2. En el panel Explorador, expande tu proyecto.

    3. Junto al conjunto de datos que quieras eliminar, haz clic en Ver acciones y, a continuación, en Eliminar.

  3. Elimina el segmento de Cloud Storage.

    1. En la Google Cloud consola, ve a la página Segmentos de Cloud Storage.

      Ir a Contenedores

    2. Seleccione el contenedor que quiera eliminar, haga clic en Eliminar y, a continuación, siga las instrucciones.

gcloud

  1. Para eliminar la suscripción y el tema de Pub/Sub, usa los comandos gcloud pubsub subscriptions delete y gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. Para eliminar la tabla de BigQuery, usa el comando bq rm.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. Elimina el conjunto de datos de BigQuery. El conjunto de datos por sí solo no conlleva ningún cargo.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. Para eliminar el segmento de Cloud Storage y sus objetos, usa el comando gcloud storage rm. El contenedor por sí solo no conlleva ningún cargo.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revocar credenciales

Consola

Si conservas el proyecto, revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine.

  1. En la consola, ve a la página Gestión de identidades y accesos. Google Cloud

Ir a IAM

  1. Selecciona un proyecto, una carpeta o una organización.

  2. Busca la fila que contenga la cuenta principal cuyo acceso quieras revocar. En esa fila, haz clic en Editar principal.

  3. Haga clic en el botón Eliminar de cada rol que quiera revocar y, a continuación, haga clic en Guardar.

gcloud

  • Si conservas el proyecto, revoca los roles que hayas concedido a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Siguientes pasos