Plantillas de transmisión proporcionadas por Google

Google proporciona un conjunto de plantillas de código abierto de Dataflow. Para obtener información general sobre las plantillas, consulta la página Descripción general. Para obtener una lista de todas las plantillas proporcionadas por Google, consulta la página sobre cómo comenzar con las plantillas proporcionadas por Google.

En esta página, se documentan las plantillas de transmisión:

Suscripción de Pub/Sub a BigQuery

La plantilla de suscripción de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos para esta canalización:

  • Los mensajes de Pub/Sub deben estar en formato JSON, como se describe aquí. Por ejemplo, los mensajes con formato {"k1":"v1", "k2":"v2"} pueden insertarse en una tabla de BigQuery con dos columnas, denominadas k1 y k2, con el tipo de datos de string.
  • La tabla de salida debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription>.
outputTableSpec La ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>

Ejecuta la plantilla de suscripción de Pub/Sub a BigQuery

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Pub/Sub Subscription to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Tema de Pub/Sub a BigQuery

La plantilla de tema de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON de un tema de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos para esta canalización:

  • Los mensajes de Pub/Sub deben estar en formato JSON, como se describe aquí. Por ejemplo, los mensajes con formato {"k1":"v1", "k2":"v2"} pueden insertarse en una tabla de BigQuery con dos columnas, denominadas k1 y k2, con el tipo de datos de string.
  • La tabla de salida debe existir antes de que se ejecute la canalización.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<project>/topics/<topic>.
outputTableSpec La ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>

Ejecuta la plantilla del tema de Pub/Sub a BigQuery

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Pub/Sub Topic to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro a BigQuery

La plantilla de Pub/Sub Avro a BigQuery es una canalización de transmisión que transfiere datos de Avro desde una suscripción de Pub/Sub a una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de mensajes no entregados de Pub/Sub.

Requisitos para esta canalización

  • La suscripción de entrada de Pub/Sub debe existir.
  • El archivo de esquema para los registros de Avro debe existir en Cloud Storage.
  • El tema de mensajes no entregados de Pub/Sub debe existir.
  • El conjunto de datos de salida de BigQuery debe existir.

Parámetros de la plantilla

Parámetro Descripción
schemaPath Ubicación de Cloud Storage del archivo de esquema de Avro. Por ejemplo, gs://path/to/my/schema.avsc.
inputSubscription Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription>.
outputTopic Tema de Pub/Sub que se usa como mensajes no entregados para registros con errores. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec Ubicación de la tabla de salida de BigQuery. Por ejemplo, <my-project>:<my-dataset>.<my-table>. Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el esquema de Avro proporcionado por el usuario.
writeDisposition La WriteDisposition de BigQuery (opcional). Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Predeterminada: WRITE_APPEND.
createDisposition La CreateDisposition de BigQuery (opcional). Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. Predeterminada: CREATE_IF_NEEDED.

Ejecuta la plantilla de Pub/Sub Avro a BigQuery

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Pub/Sub Avro to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de la herramienta de gcloud a fin de ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION_NAME: Es el nombre de la región de Dataflow (por ejemplo, us-central1).
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola de mensajes no entregados.
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • LOCATION: Es el nombre de la región de Dataflow (por ejemplo, us-central1).
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola de mensajes no entregados.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub a Pub/Sub

La plantilla de Pub/Sub a Pub/Sub es una canalización de transmisión que lee mensajes de una suscripción de Pub/Sub y los escribe en otro tema de Pub/Sub. La canalización también acepta una clave de atributo de mensaje opcional y un valor que se puede usar para filtrar los mensajes que se deben escribir en el tema de Pub/Sub. Puedes usar esta plantilla para copiar mensajes de una suscripción de Pub/Sub a otro tema de Pub/Sub con un filtro de mensajes opcional.

Requisitos para esta canalización:

  • La suscripción a Pub/Sub de origen debe existir antes de la ejecución.
  • El tema de Pub/Sub de destino debe existir antes de la ejecución.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Suscripción a Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Tema de Cloud Pub/Sub en el que se escribe el resultado. Por ejemplo, projects/<project-id>/topics/<topic-name>.
filterKey Eventos de filtro basados en la clave de atributo (opcional). No se aplican filtros si no se especifica filterKey.
filterValue Valor del atributo del filtro para usar en caso de que se provea filterKey (opcional). De forma predeterminada, se usa un filterValue nulo.

Ejecuta la plantilla de Pub/Sub a Pub/Sub

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Pub/Sub to Pub/Sub template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. No se filtrarán las coincidencias parciales (p. ej., substring). De forma predeterminada, se usa un valor de filtro de evento nulo.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. No se filtrarán las coincidencias parciales (p. ej., substring). De forma predeterminada, se usa un valor de filtro de evento nulo.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub a Splunk

La plantilla de Pub/Sub a Splunk es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub y escribe la carga útil del mensaje en Splunk mediante el recopilador de eventos HTTP (HEC) de Splunk. Antes de escribir en Splunk, también puedes aplicar una función definida por el usuario de JavaScript a la carga útil del mensaje. Los mensajes con fallas de procesamiento se reenvían a un tema de mensajes no enviados de Pub/Sub para solucionar los problemas y volver a procesarlos.

Como una capa adicional de protección para tu token HEC, también puedes pasar una clave de Cloud KMS junto con el parámetro de token HEC codificado en base64 encriptado con la clave de Cloud KMS. Consulta el extremo de encriptación de la API de Cloud KMS para obtener detalles adicionales sobre la encriptación de tu parámetro de token HEC.

Requisitos para esta canalización:

  • La suscripción de Pub/Sub de origen debe existir antes de ejecutar la canalización.
  • El tema de mensajes no entregados de Pub/Sub debe existir antes de ejecutar la canalización.
  • Se debe poder acceder al extremo de HEC de Splunk desde la red de trabajadores de Dataflow.
  • El token HEC de Splunk se debe generar y estar disponible.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription La suscripción de Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
token El token de autenticación HEC de Splunk. Esta string codificada en base64 se puede encriptar con una clave de Cloud KMS para mayor seguridad.
url La URL de HEC de Splunk. Debe ser enrutable desde la VPC en la que se ejecuta la canalización. Por ejemplo, https://splunk-hec-host:8088.
outputDeadletterTopic El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath La ruta de acceso de Cloud Storage que contiene todo el código de JavaScript (opcional). Por ejemplo, gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName El nombre de la función de JavaScript que se llamará (opcional). Por ejemplo, si tu función de JavaScript es function myTransform(inJson) { ...dostuff...}, el nombre de la función es myTransform.
batchCount El tamaño del lote para enviar varios eventos a Splunk (opcional). Predeterminado 1 (sin lotes).
parallelism La cantidad máxima de solicitudes paralelas (opcional). Predeterminado 1 (sin paralelismo).
disableCertificateValidation Inhabilita la validación del certificado SSL (opcional). El valor predeterminado es falso (validación habilitada).
includePubsubMessage Incluye el mensaje de Pub/Sub completo en la carga útil (opcional). El valor predeterminado es falso (solo se incluye el elemento de datos en la carga útil).
tokenKMSEncryptionKey La clave de Cloud KMS para desencriptar la string del token HEC (opcional). Si se proporciona la clave de Cloud KMS, la string del token HEC debe encriptarse.

Ejecuta la plantilla de Pub/Sub a Splunk

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Pub/Sub to Splunk template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
  • URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
  • DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • JAVASCRIPT_FUNCTION: Es el nombre de la función de JavaScript.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript (por ejemplo, gs://your-bucket/your-function.js).
  • BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
  • PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
  • DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
  • URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
  • DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • JAVASCRIPT_FUNCTION: Es el nombre de la función de JavaScript.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript (por ejemplo, gs://your-bucket/your-function.js).
  • BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
  • PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
  • DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub a archivos de Avro en Cloud Storage

La plantilla de Pub/Sub a archivos de Avro en Cloud Storage es una canalización de transmisión que lee datos de un tema de Pub/Sub y escribe archivos de Avro en el bucket de Cloud Storage especificado.

Requisitos para esta canalización:

  • El tema de entrada de Pub/Sub debe existir antes de la ejecución de la canalización.

Parámetros de la plantilla

Parámetro Descripción
inputTopic Tema de Cloud Pub/Sub al cual suscribirse para el consumo de mensaje. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory Directorio de salida en el que se archivarán los archivos de Avro de salida. Agrega una / al final. Por ejemplo: gs://example-bucket/example-directory/.
avroTempDirectory Directorio para los archivos de Avro temporales. Agrega una / al final. Por ejemplo: gs://example-bucket/example-directory/.
outputFilenamePrefix Prefijo de nombre de archivo de salida para los archivos Avro (opcional).
outputFilenameSuffix [Opcional] Sufijo de nombre de archivo de salida para los archivos Avro.
outputShardTemplate [Opcional] Plantilla de fragmentación del archivo de salida. Especificada como secuencias repetidas de letras “S” o “N” (por ejemplo: SSS-NNN). Estas se reemplazan con el número de fragmentación o la cantidad de fragmentaciones respectivamente. El formato de la plantilla predeterminada es “W-P-SS-of-NN” cuando no se especifica este parámetro.
numShards [Opcional] Cantidad máxima de fragmentos de salida que se produce con la escritura. La cantidad máxima predeterminada de fragmentos es 1.

Ejecuta la plantilla de Pub/Sub a Cloud Storage Avro

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Pub/Sub to Cloud Storage Avro template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.
  • NUM_SHARDS: Es la cantidad de fragmentos de salida.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.
  • NUM_SHARDS: Es la cantidad de fragmentos de salida.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub a archivos de texto en Cloud Storage

La plantilla de Pub/Sub a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.

Requisitos para esta canalización:

  • El tema de Pub/Sub debe existir antes de la ejecución.
  • Los mensajes publicados en el tema deben tener formato de texto.
  • Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/. El valor debe terminar con una barra.
outputFilenamePrefix El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output-
outputFilenameSuffix El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv.
outputShardTemplate La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos llegarán a un único archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate será 00-of-01.

Ejecuta la plantilla de Pub/Sub a archivos de texto en Cloud Storage

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Pub/Sub to Text Files on Cloud Storage template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub a MongoDB

La plantilla de Pub/Sub a MongoDB es una canalización de transmisión que lee mensajes con codificación JSON de una suscripción a Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, esta canalización admite transformaciones adicionales que se pueden incluir mediante una función definida por el usuario (UDF) de JavaScript. Cualquier error que se produce mientras se ejecutan transformaciones, o debido a una falta de coincidencia del esquema o un JSON con formato incorrecto, se registra en una tabla de mensajes no entregados de BigQuery junto con el mensaje de entrada. Si la tabla no existe antes de la ejecución, la canalización crea de forma automática la tabla de mensajes no entregados.

Requisitos para esta canalización:

  • La suscripción a Pub/Sub debe existir y los mensajes deben estar codificados en un formato JSON válido.
  • El clúster de MongoDB debe existir y debe ser accesible desde las máquinas de trabajador de Dataflow.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Nombre de la suscripción a Pub/Sub. Por ejemplo: projects/<project-id>/subscriptions/<subscription-name>.
mongoDBUri Una lista separada por comas de los servidores de MongoDB. Por ejemplo: 192.285.234.12:27017,192.287.123.11:27017.
database La base de datos en MongoDB en la que se debe almacenar la colección. Por ejemplo: my-db.
collection Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection.
deadletterTable La tabla de BigQuery que almacena mensajes debido a fallas (esquemas no coincidentes, JSON con formato incorrecto, etcétera). Por ejemplo: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath La ubicación de Cloud Storage del archivo JavaScript que contiene la transformación de la UDF (opcional). Por ejemplo: gs://mybucket/filename.json.
javascriptTextTransformFunctionName El nombre de la UDF de JavaScript (opcional). Por ejemplo: transform.
batchSize El tamaño de lote que se usa para la inserción por lotes de documentos en MongoDB (opcional). Valor predeterminado: 1000.
batchSizeBytes El tamaño del lote en bytes (opcional). Valor predeterminado: 5242880.
maxConnectionIdleTime El tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión (opcional). Valor predeterminado: 60000.
sslEnabled El valor booleano que indica si la conexión a MongoDB está habilitada con SSL (opcional). Valor predeterminado: true.
ignoreSSLCertificate El valor booleano que indica si se debe ignorar el certificado SSL (opcional). Valor predeterminado: true.
withOrdered El valor booleano que habilita las inserciones masivas ordenadas en MongoDB (opcional). Valor predeterminado: true.
withSSLInvalidHostNameAllowed El valor booleano que indica si se permite un nombre de host no válido para la conexión SSL (opcional). Valor predeterminado: true.

Ejecuta la plantilla de Pub/Sub a MongoDB

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona Pub/Sub to MongoDB template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • REGION_NAME: Es el nombre de la región de Dataflow (por ejemplo, us-central1).
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/<project-id>/subscriptions/<subscription-name>).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • LOCATION: Es el nombre de la región de Dataflow (por ejemplo, us-central1).
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/<project-id>/subscriptions/<subscription-name>).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Archivos de texto en Cloud Storage a BigQuery (transmisión)

La canalización de archivos de texto en Cloud Storage a BigQuery es una canalización de transmisión que te permite transmitir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones y enviar el resultado a BigQuery.

Requisitos para esta canalización:

  • Crea un archivo de esquema de BigQuery con formato JSON que describa la tabla de salida.
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • Crea un archivo JavaScript (.js) con tu función UDF que proporcione la lógica para transformar las líneas de texto. Ten en cuenta que tu función debe mostrar una string JSON.

    Por ejemplo, esta función divide cada línea de un archivo CSV y muestra una string JSON después de transformar los valores.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Parámetros de la plantilla

Parámetro Descripción
javascriptTextTransformGcsPath Ubicación de Cloud Storage de tu UDF de JavaScript. Por ejemplo: gs://my_bucket/my_function.js.
JSONPath Ubicación de Cloud Storage de tu archivo de esquema de BigQuery, descrito como un JSON. Por ejemplo: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName El nombre de la función de JavaScript que quieres nombrar como tu UDF. Por ejemplo: transform.
outputTable La tabla de BigQuery completamente calificada. Por ejemplo: my-project:dataset.table
inputFilePattern Ubicación en Cloud Storage del texto que quieres procesar. Por ejemplo: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Directorio temporal para el proceso de carga de BigQuery. Por ejemplo: gs://my-bucket/my-files/temp_dir.
outputDeadletterTable Tabla de mensajes que no llegaron a la tabla de resultados. Por ejemplo: my-project:dataset.my-unprocessed-table. Si no existe, se creará durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla de Cloud Storage Text a BigQuery (transmisión)

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Cloud Storage Text to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • JAVASCRIPT_FUNCTION: Es el nombre de la UDF.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript.
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • REGION: Es el extremo regional (por ejemplo, us-west-1).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • JAVASCRIPT_FUNCTION: Es el nombre de la UDF.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript.
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Cloud Storage Text a Pub/Sub (transmisión)

Esta plantilla crea una canalización de transmisión que sondea de forma continua los archivos de texto nuevos subidos a Cloud Storage, lee cada archivo línea por línea y publica strings en un tema de Pub/Sub. La plantilla publica registros en un archivo delimitado por saltos de línea que contiene registros JSON o archivos CSV en un tema de Pub/Sub para su procesamiento en tiempo real. Puedes usar esta plantilla para reproducir datos en Pub/Sub.

Por el momento, el intervalo de sondeo es fijo y está configurado en 10 segundos. Esta plantilla no establece una marca de tiempo en los registros individuales. Es por eso que la hora del evento será la misma que la hora de publicación durante la ejecución. Si tu canalización depende de la hora precisa del evento para el procesamiento, no deberías usar esta canalización.

Requisitos para esta canalización:

  • Los archivos de entrada deben tener el formato JSON delimitado por saltos de línea o CSV. Los registros que abarcan varias líneas en los archivos de origen pueden causar problemas de bajada, ya que cada línea dentro de los archivos se publicará como un mensaje en Pub/Sub.
  • El tema de Pub/Sub debe existir antes de la ejecución.
  • La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/files/*.json.
outputTopic El tema de entrada de Pub/Sub en el que se desea escribir. El nombre debe tener el formato projects/<project-id>/topics/<topic-name>.

Ejecuta la plantilla de Cloud Storage Text a Pub/Sub (transmisión)

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Cloud Storage Text to Pub/Sub (Stream) template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_TOPIC_NAME por el nombre del tema de Pub/Sub.
  • Reemplaza YOUR_BUCKET_NAME por el nombre del depósito de Cloud Storage.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/files/*.json,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_TOPIC_NAME por el nombre del tema de Pub/Sub.
  • Reemplaza YOUR_BUCKET_NAME por el nombre del depósito de Cloud Storage.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/files/*.json",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Enmascara datos y asigna tokens mediante Cloud DLP desde Cloud Storage a BigQuery (transmisión)

El enmascaramiento de datos y la asignación de tokens mediante Cloud DLP de Cloud Storage a BigQuery es una canalización de transmisión que lee archivos csv de un depósito de Cloud Storage, llama a la API de Cloud Data Loss Prevention (Cloud DLP) para la desidentificación de datos en la tabla de BigQuery especificada. Esta plantilla admite el uso de una plantilla de inspección de Cloud DLP y una plantilla de desidentificación de Cloud DLP. Esto permite que los usuarios inspeccionen información que puede ser sensible y desidentificar datos estructurados en los que las columnas están especificadas para ser desidentificadas y no se necesita la inspección.

Requisitos para esta canalización:

  • Los datos de entrada para la asignación de tokens deben existir
  • Las plantillas de Cloud DLP deben existir (por ejemplo, InspectTemplate y DeidentifyTemplate). Consulta Plantillas de Cloud DLP para obtener más detalles.
  • El conjunto de datos de BigQuery debe existir

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern Los archivos csv desde los que se leen los registros de datos de entrada. También se acepta el comodín. Por ejemplo, gs://mybucket/my_csv_filename.csv o gs://mybucket/file-*.csv.
dlpProjectId ID del proyecto de Cloud DLP que posee el recurso de la API de Cloud DLP. Este proyecto de Cloud DLP puede ser el mismo que posee las plantillas de Cloud DLP, o puede ser uno independiente. Por ejemplo, my_dlp_api_project.
deidentifyTemplateName Plantilla de desidentificación de Cloud DLP que se usa para las solicitudes a la API, especificada con el patrón projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Por ejemplo, projects/my_project/deidentifyTemplates/100.
datasetName Conjunto de datos de BigQuery para enviar resultados con asignación de token.
batchSize Tamaño de fragmentación o del lote para enviar datos a fin de inspeccionar o quitar la asignación de token. En el caso de un archivo csv, batchSize es la cantidad de filas en un lote. Los usuarios deben determinar el tamaño del lote según el tamaño de los registros y del archivo. Ten en cuenta que la API de Cloud DLP tiene un límite de tamaño de carga útil de 524 KB por llamada a la API.
inspectTemplateName Plantilla de inspección de Cloud DLP que se usará para solicitudes a la API, especificada con el patrón projects/{template_project_id}/identifyTemplates/{idTemplateId} (opcional). Por ejemplo, projects/my_project/identifyTemplates/100.

Ejecuta la plantilla de Enmascara datos y asigna tokens mediante Cloud DLP desde Cloud Storage a BigQuery

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Data Masking/Tokenization using Cloud DLP from Cloud Storage to BigQuery (Stream) template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_TEMPLATE_PROJECT_ID por el ID del proyecto de la plantilla.
  • Reemplaza YOUR_DLP_API_PROJECT_ID por el ID del proyecto de la API de Cloud DLP.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_INPUT_DATA por la ruta del archivo de entrada.
  • Reemplaza YOUR_DEIDENTIFY_TEMPLATE por el número de plantilla de Cloud DLPDeidentify.
  • Reemplaza YOUR_DATASET_NAME por el nombre del conjunto de datos de BigQuery.
  • Reemplaza YOUR_INSPECT_TEMPLATE por el número de plantilla de Cloud DLPInspect.
  • Reemplaza BATCH_SIZE_VALUE por el tamaño del lote (cantidad de filas por API para los archivos CSV).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --parameters \
inputFilePattern=YOUR_INPUT_DATA,\
dlpProjectId=YOUR_DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE,\
datasetName=YOUR_DATASET,\
batchSize=BATCH_SIZE_VALUE

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_TEMPLATE_PROJECT_ID por el ID del proyecto de la plantilla.
  • Reemplaza YOUR_DLP_API_PROJECT_ID por el ID del proyecto de la API de Cloud DLP.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_INPUT_DATA por la ruta del archivo de entrada.
  • Reemplaza YOUR_DEIDENTIFY_TEMPLATE por el número de plantilla de Cloud DLPDeidentify.
  • Reemplaza YOUR_DATASET_NAME por el nombre del conjunto de datos de BigQuery.
  • Reemplaza YOUR_INSPECT_TEMPLATE por el número de plantilla de Cloud DLPInspect.
  • Reemplaza BATCH_SIZE_VALUE por el tamaño del lote (cantidad de filas por API para los archivos CSV).
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
      "inputFilePattern":YOUR_INPUT_DATA,
      "dlpProjectId": "YOUR_DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/deidentifyTemplates/YOUR_DEIDENTIFY_TEMPLATE".
      "inspectTemplateName": "projects/YOUR_TEMPLATE_PROJECT_ID/identifyTemplates/YOUR_IDENTIFY_TEMPLATE",
      "datasetName": "YOUR_DATASET",
      "batchSize": "BATCH_SIZE_VALUE"
   },
   "environment": { "zone": "us-central1-f" }
}

Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub (transmisión)

La plantilla Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub es una canalización de transmisión que lee mensajes de Pub/Sub con datos de modificación desde una base de datos de MySQL y escribe los registros en BigQuery. Un conector Debezium captura las modificaciones en la base de datos de MySQL y publica los datos modificados en Pub/Sub. Luego, la plantilla lee los mensajes de Pub/Sub y los escribe en BigQuery.

Puedes usar esta plantilla para sincronizar las bases de datos de MySQL y las tablas de BigQuery. La canalización escribe los datos modificados en una tabla de etapa de pruebas de BigQuery y actualiza de forma intermitente una tabla de BigQuery que replica la base de datos de MySQL.

Requisitos para esta canalización:

Parámetros de la plantilla

Parámetro Descripción
inputSubscriptions La lista separada por comas de las suscripciones de entrada de Pub/Sub desde la que se va a leer, en el formato <subscription>,<subscription>, ...
changeLogDataset El conjunto de datos de BigQuery para almacenar las tablas de etapa de pruebas, en el formato <my-dataset>
replicaDataset La ubicación del conjunto de datos de BigQuery para almacenar las tablas de réplica, en el formato <my-dataset>
Opcional: updateFrequencySecs El intervalo en el que la canalización actualiza la tabla de BigQuery y replica la base de datos de MySQL.

Ejecuta la plantilla de Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub

Para ejecutar esta plantilla, sigue estos pasos:

  1. En tu máquina local, clona el repositorio DataflowTemplates.
  2. Cambia al directorio v2/cdc-parent.
  3. Asegúrate de que el conector Debezium esté implementado.
  4. Mediante Maven, ejecuta la plantilla de Dataflow.

    En este ejemplo, debes reemplazar los siguientes valores:

    • Reemplaza PROJECT_ID por el ID del proyecto.
    • Reemplaza YOUR_SUBSCRIPTIONS por tu lista de nombres de suscripción de Pub/Sub separados por comas.
    • Reemplaza YOUR_CHANGELOG_DATASET por tu conjunto de datos de BigQuery para los datos del registro de cambios, y reemplaza YOUR_REPLICA_DATASET por el conjunto de datos de BigQuery para las tablas de réplica.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka a BigQuery

La plantilla de Apache Kafka a BigQuery es una canalización de transmisión que transfiere datos de texto de Apache Kafka, ejecuta una función definida por el usuario (UDF) y envía los registros resultantes a BigQuery. Cualquier error que ocurra en la transformación de los datos, la ejecución de la UDF o la inserción en la tabla de resultados se inserta en una tabla de errores independiente en BigQuery. Si la tabla de errores no existe antes de la ejecución, se creará.

Requisitos para esta canalización

  • La tabla de salida de BigQuery debe existir.
  • El servidor del agente de Apache Kafka debe estar en ejecución y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.
  • Los temas de Apache Kafka deben existir, y los mensajes deben estar codificados en un formato JSON válido.

Parámetros de la plantilla

Parámetro Descripción
outputTableSpec La ubicación de la tabla de salida de BigQuery en la que se deben escribir los mensajes de Apache Kafka, en el formato my-project:dataset.table.
inputTopics Los temas de entrada de Apache Kafka desde los que se debe leer en una lista separada por comas. Por ejemplo: messages.
bootstrapServers La dirección del host de los servidores del agente de Apache Kafka en ejecución en una lista separada por comas, cada dirección de host tiene el formato 35.70.252.199:9092.
javascriptTextTransformGcsPath La ruta de acceso de la ubicación de Cloud Storage a la UDF de JavaScript (opcional). Por ejemplo: gs://my_bucket/my_function.js.
javascriptTextTransformFunctionName El nombre del JavaScript que se llamará como tu UDF (opcional). Por ejemplo: transform.
outputDeadletterTable La ubicación de la tabla de salida de BigQuery en la que se escriben los registros de mensajes no entregados, en el formato my-project:dataset.my-deadletter-table (opcional). Si no existe, la tabla se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla de Apache Kafka en BigQuery

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Apache Kafka to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de la herramienta de gcloud a fin de ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_JAVASCRIPT_FUNCTION por el nombre de tu UDF.
  • Reemplaza REGION_NAME por el nombre de la región de Dataflow. Por ejemplo: us-central1..
  • Reemplaza BIGQUERY_TABLE por el nombre de la tabla de BigQuery.
  • Reemplaza KAFKA_TOPICS por la lista de temas de Apache Kafka. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.
  • Reemplaza PATH_TO_JAVASCRIPT_UDF_FILE por la ruta de acceso de Cloud Storage al archivo .js que contiene tu código de JavaScript.
  • Reemplaza YOUR_JAVASCRIPT_FUNCTION por el nombre de tu UDF.
  • Reemplaza KAFKA_SERVER_ADDRESSES por la lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_JAVASCRIPT_FUNCTION por el nombre de tu UDF.
  • Reemplaza LOCATION por el nombre de la región de Dataflow. Por ejemplo: us-central1..
  • Reemplaza BIGQUERY_TABLE por el nombre de la tabla de BigQuery.
  • Reemplaza KAFKA_TOPICS por la lista de temas de Apache Kafka. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.
  • Reemplaza PATH_TO_JAVASCRIPT_UDF_FILE por la ruta de acceso de Cloud Storage al archivo .js que contiene tu código de JavaScript.
  • Reemplaza YOUR_JAVASCRIPT_FUNCTION por el nombre de tu UDF.
  • Reemplaza KAFKA_SERVER_ADDRESSES por la lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}