Plantillas de transmisión que proporciona Google

Google proporciona un conjunto de plantillas de código abierto de Dataflow. Para obtener información general sobre las plantillas, consulta la página Descripción general. Para obtener una lista de todas las plantillas proporcionadas por Google, consulta la página Plantillas que proporciona Google.

En esta página, se documentan las plantillas de transmisión.

Suscripción de Pub/Sub a BigQuery

La plantilla de suscripción de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos para esta canalización:

  • Los mensajes de Pub/Sub deben estar en formato JSON, como se describe aquí. Por ejemplo, los mensajes con formato {"k1":"v1", "k2":"v2"} pueden insertarse en una tabla de BigQuery con dos columnas, denominadas k1 y k2, con el tipo de datos de string.
  • La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription>.
outputTableSpec Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable La tabla de BigQuery para los mensajes que no llegaron a la tabla de salida, en el formato <my-project>:<my-dataset>.<my-table>. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla de suscripción de Pub/Sub a BigQuery

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Pub/Sub Subscription to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas.
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • JOB_NAME: Es el nombre del trabajo que elijas.
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Tema de Pub/Sub a BigQuery

La plantilla de tema de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON de un tema de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos para esta canalización:

  • Los mensajes de Pub/Sub deben estar en formato JSON, como se describe aquí. Por ejemplo, los mensajes con formato {"k1":"v1", "k2":"v2"} pueden insertarse en una tabla de BigQuery con dos columnas, denominadas k1 y k2, con el tipo de datos de string.
  • La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<project>/topics/<topic>.
outputTableSpec Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable La tabla de BigQuery para los mensajes que no llegaron a la tabla de resultados. Debe estar en formato <my-project>:<my-dataset>.<my-table>. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla del tema de Pub/Sub a BigQuery

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Pub/Sub Topic to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro a BigQuery

La plantilla de Pub/Sub Avro a BigQuery es una canalización de transmisión que transfiere datos de Avro desde una suscripción de Pub/Sub a una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.

Requisitos para esta canalización

  • La suscripción de entrada de Pub/Sub debe existir.
  • El archivo de esquema para los registros de Avro debe existir en Cloud Storage.
  • El tema de Pub/Sub sin procesar debe existir. ¿
  • El conjunto de datos de salida de BigQuery debe existir.

Parámetros de la plantilla

Parámetro Descripción
schemaPath Ubicación de Cloud Storage del archivo de esquema de Avro. Por ejemplo, gs://path/to/my/schema.avsc.
inputSubscription Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription>
outputTopic El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec Ubicación de la tabla de salida de BigQuery. Por ejemplo, <my-project>:<my-dataset>.<my-table>. Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el esquema de Avro proporcionado por el usuario.
writeDisposition La WriteDisposition de BigQuery (opcional). Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Predeterminada: WRITE_APPEND.
createDisposition La CreateDisposition de BigQuery (opcional). Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. Predeterminada: CREATE_IF_NEEDED.

Ejecuta la plantilla de Pub/Sub Avro a BigQuery

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Pub/Sub Avro to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de la herramienta de gcloud a fin de ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub a Pub/Sub

La plantilla de Pub/Sub a Pub/Sub es una canalización de transmisión que lee mensajes de una suscripción de Pub/Sub y los escribe en otro tema de Pub/Sub. La canalización también acepta una clave de atributo de mensaje opcional y un valor que se puede usar para filtrar los mensajes que se deben escribir en el tema de Pub/Sub. Puedes usar esta plantilla para copiar mensajes de una suscripción de Pub/Sub a otro tema de Pub/Sub con un filtro de mensajes opcional.

Requisitos para esta canalización:

  • La suscripción a Pub/Sub de origen debe existir antes de la ejecución.
  • El tema de Pub/Sub de destino debe existir antes de la ejecución.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Suscripción a Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Tema de Cloud Pub/Sub en el que se escribe el resultado. Por ejemplo, projects/<project-id>/topics/<topic-name>.
filterKey Eventos de filtro basados en la clave de atributo (opcional). No se aplican filtros si no se especifica filterKey.
filterValue Valor del atributo del filtro para usar en caso de que se provea filterKey (opcional). De forma predeterminada, se usa un filterValue nulo.

Ejecuta la plantilla de Pub/Sub a Pub/Sub

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Pub/Sub to Pub/Sub template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. No se filtrarán las coincidencias parciales (p. ej., substring). De forma predeterminada, se usa un valor de filtro de evento nulo.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. No se filtrarán las coincidencias parciales (p. ej., substring). De forma predeterminada, se usa un valor de filtro de evento nulo.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub a Splunk

La plantilla de Pub/Sub a Splunk es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub y escribe la carga útil del mensaje en Splunk mediante el recopilador de eventos HTTP (HEC) de Splunk. El caso de uso más común de esta plantilla es exportar registros a Splunk. Para ver un ejemplo del flujo de trabajo subyacente, consulta Implementa exportaciones de registros listas para la producción a Splunk mediante Dataflow.

Antes de escribir en Splunk, también puedes aplicar una función definida por el usuario de JavaScript a la carga útil del mensaje. Los mensajes con fallas de procesamiento se reenvían a un tema de mensajes no enviados de Pub/Sub para solucionar los problemas y volver a procesarlos.

Como una capa adicional de protección para tu token HEC, también puedes pasar una clave de Cloud KMS junto con el parámetro de token HEC codificado en base64 encriptado con la clave de Cloud KMS. Consulta el extremo de encriptación de la API de Cloud KMS para obtener detalles adicionales sobre la encriptación de tu parámetro de token HEC.

Requisitos para esta canalización:

  • La suscripción de Pub/Sub de origen debe existir antes de ejecutar la canalización.
  • El tema sin procesar de Pub/Sub debe existir antes de ejecutar la canalización.
  • Se debe poder acceder al extremo de HEC de Splunk desde la red de trabajadores de Dataflow.
  • El token HEC de Splunk se debe generar y estar disponible.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription La suscripción de Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
token El token de autenticación HEC de Splunk. Esta string codificada en base64 se puede encriptar con una clave de Cloud KMS para mayor seguridad.
url La URL de HEC de Splunk. Debe ser enrutable desde la VPC en la que se ejecuta la canalización. Por ejemplo, https://splunk-hec-host:8088.
outputDeadletterTopic El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath La ruta de acceso de Cloud Storage que contiene todo el código de JavaScript (opcional). Por ejemplo, gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName El nombre de la función de JavaScript que se llamará (opcional). Por ejemplo, si tu función de JavaScript es function myTransform(inJson) { ...dostuff...}, el nombre de la función es myTransform.
batchCount El tamaño del lote para enviar varios eventos a Splunk (opcional). Predeterminado 1 (sin lotes).
parallelism La cantidad máxima de solicitudes paralelas (opcional). Predeterminado 1 (sin paralelismo).
disableCertificateValidation Inhabilita la validación del certificado SSL (opcional). El valor predeterminado es falso (validación habilitada).
includePubsubMessage Incluye el mensaje de Pub/Sub completo en la carga útil (opcional). El valor predeterminado es falso (solo se incluye el elemento de datos en la carga útil).
tokenKMSEncryptionKey La clave de Cloud KMS para desencriptar la string del token HEC (opcional). Si se proporciona la clave de Cloud KMS, la string del token HEC debe encriptarse.

Ejecuta la plantilla de Pub/Sub a Splunk

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Pub/Sub to Splunk template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
  • URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
  • DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • JAVASCRIPT_FUNCTION: Es el nombre de la función de JavaScript.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript (por ejemplo, gs://your-bucket/your-function.js).
  • BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
  • PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
  • DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
  • URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
  • DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • JAVASCRIPT_FUNCTION: Es el nombre de la función de JavaScript.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript (por ejemplo, gs://your-bucket/your-function.js).
  • BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
  • PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
  • DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub a archivos de Avro en Cloud Storage

La plantilla de Pub/Sub a archivos de Avro en Cloud Storage es una canalización de transmisión que lee datos de un tema de Pub/Sub y escribe archivos de Avro en el bucket de Cloud Storage especificado.

Requisitos para esta canalización:

  • El tema de entrada de Pub/Sub debe existir antes de la ejecución de la canalización.

Parámetros de la plantilla

Parámetro Descripción
inputTopic Tema de Cloud Pub/Sub al cual suscribirse para el consumo de mensaje. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory Directorio de salida en el que se archivarán los archivos de Avro de salida. Agrega una / al final. Por ejemplo: gs://example-bucket/example-directory/.
avroTempDirectory Directorio para los archivos de Avro temporales. Agrega una / al final. Por ejemplo: gs://example-bucket/example-directory/.
outputFilenamePrefix Prefijo de nombre de archivo de salida para los archivos Avro (opcional).
outputFilenameSuffix [Opcional] Sufijo de nombre de archivo de salida para los archivos Avro.
outputShardTemplate [Opcional] Plantilla de fragmentación del archivo de salida. Especificada como secuencias repetidas de letras “S” o “N” (por ejemplo: SSS-NNN). Estas se reemplazan con el número de fragmentación o la cantidad de fragmentaciones respectivamente. El formato de la plantilla predeterminada es “W-P-SS-of-NN” cuando no se especifica este parámetro.
numShards [Opcional] Cantidad máxima de fragmentos de salida que se produce con la escritura. La cantidad máxima predeterminada de fragmentos es 1.

Ejecuta la plantilla de Pub/Sub a Cloud Storage Avro

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Pub/Sub to Cloud Storage Avro template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.
  • NUM_SHARDS: Es la cantidad de fragmentos de salida.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.
  • NUM_SHARDS: Es la cantidad de fragmentos de salida.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub a archivos de texto en Cloud Storage

La plantilla de Pub/Sub a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.

Requisitos para esta canalización:

  • El tema de Pub/Sub debe existir antes de la ejecución.
  • Los mensajes publicados en el tema deben tener formato de texto.
  • Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/. El valor debe terminar con una barra.
outputFilenamePrefix El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output-
outputFilenameSuffix El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv.
outputShardTemplate La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos llegarán a un único archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate será 00-of-01.

Ejecuta la plantilla de Pub/Sub a archivos de texto en Cloud Storage

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Pub/Sub to Text Files on Cloud Storage template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub a MongoDB

La plantilla de Pub/Sub a MongoDB es una canalización de transmisión que lee mensajes con codificación JSON de una suscripción a Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, esta canalización admite transformaciones adicionales que se pueden incluir mediante una función definida por el usuario (UDF) de JavaScript. Cualquier error que se produjo debido a una falta de coincidencia del esquema, JSON con errores o mientras se ejecutaban transformaciones se registra en una tabla de BigQuery para mensajes no procesados junto con un mensaje de entrada. Si no existe una tabla para los registros no procesados antes de la ejecución, la canalización crea esta tabla de forma automática.

Requisitos para esta canalización:

  • La suscripción a Pub/Sub debe existir y los mensajes deben estar codificados en un formato JSON válido.
  • El clúster de MongoDB debe existir y debe ser accesible desde las máquinas de trabajador de Dataflow.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Nombre de la suscripción a Pub/Sub. Por ejemplo: projects/<project-id>/subscriptions/<subscription-name>.
mongoDBUri Una lista separada por comas de los servidores de MongoDB. Por ejemplo: 192.285.234.12:27017,192.287.123.11:27017.
database La base de datos en MongoDB en la que se debe almacenar la colección. Por ejemplo: my-db.
collection Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection.
deadletterTable La tabla de BigQuery que almacena mensajes debido a fallas (esquemas no coincidentes, JSON con formato incorrecto, etcétera). Por ejemplo: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath La ubicación de Cloud Storage del archivo JavaScript que contiene la transformación de la UDF (opcional). Por ejemplo: gs://mybucket/filename.json.
javascriptTextTransformFunctionName El nombre de la UDF de JavaScript (opcional). Por ejemplo: transform.
batchSize El tamaño de lote que se usa para la inserción por lotes de documentos en MongoDB (opcional). Valor predeterminado: 1000.
batchSizeBytes El tamaño del lote en bytes (opcional). Valor predeterminado: 5242880.
maxConnectionIdleTime El tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión (opcional). Valor predeterminado: 60000.
sslEnabled El valor booleano que indica si la conexión a MongoDB está habilitada con SSL (opcional). Valor predeterminado: true.
ignoreSSLCertificate El valor booleano que indica si se debe ignorar el certificado SSL (opcional). Valor predeterminado: true.
withOrdered El valor booleano que habilita las inserciones masivas ordenadas en MongoDB (opcional). Valor predeterminado: true.
withSSLInvalidHostNameAllowed El valor booleano que indica si se permite un nombre de host no válido para la conexión SSL (opcional). Valor predeterminado: true.

Ejecuta la plantilla de Pub/Sub a MongoDB

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona Pub/Sub to MongoDB template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • REGION_NAME: Es el nombre de la región de Dataflow (por ejemplo, us-central1).
  • JOB_NAME: Es el nombre del trabajo que elijas
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto.
  • LOCATION: Es el nombre de la región de Dataflow (por ejemplo, us-central1).
  • JOB_NAME: Es el nombre del trabajo que elijas
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Datastream para Cloud Spanner

La plantilla de Datastream para Cloud Spanner es una canalización de transmisión que lee eventos de Datastream desde un bucket de Cloud Storage y los escribe en una base de datos de Cloud Spanner. Está diseñado para la migración de datos de fuentes de Datastream a Cloud Spanner.

Todas las tablas necesarias para la migración deben existir en la base de datos de destino de Cloud Spanner antes de la ejecución de la plantilla. Por lo tanto, la migración del esquema de una base de datos de origen a Cloud Spanner de destino debe completarse antes de la migración de datos. Los datos pueden existir en las tablas antes de la migración. Esta plantilla no propaga los cambios de esquema de Datastream a la base de datos de Cloud Spanner.

La coherencia de los datos solo está garantizada al final de la migración cuando todos los datos se escribieron en Cloud Spanner. A fin de almacenar información sobre el orden para cada registro escrito en Cloud Spanner, esta plantilla crea una tabla adicional (llamada tabla paralela) para cada tabla en la base de datos de Cloud Spanner. Esto se usa para garantizar la coherencia al final de la migración. Las tablas paralelas no se borran después de la migración y se pueden usar con fines de validación al final de la migración.

Cualquier error que ocurra durante la operación, como discrepancias de esquema, archivos JSON con formato incorrecto o errores resultantes de la ejecución de transformaciones, se registra en una cola de errores. La cola de errores es una carpeta de Cloud Storage que almacena todos los eventos de Datastream que encontraron errores junto con el motivo del error en formato de texto. Los errores pueden ser transitorios o permanentes, y se almacenan en las carpetas de Cloud Storage adecuadas en la cola de errores. Los errores transitorios se reintentan automáticamente, mientras que los errores permanentes no. En el caso de errores permanentes, tienes la opción de corregir los eventos de cambio y moverlos al bucket que se puede reintentar mientras se ejecuta la plantilla.

Requisitos para esta canalización:

  • Una transmisión de Datastream en estado En ejecución o No iniciado
  • Un bucket de Cloud Storage en el que se replican los eventos de Datastream.
  • Una base de datos de Cloud Spanner con tablas existentes. Estas tablas pueden estar vacías o contener datos.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern La ubicación de los archivos de Datastream en Cloud Storage para replicar. Por lo general, esta es la ruta de acceso raíz de una transmisión.
streamName El nombre o la plantilla del flujo que se consultará para obtener la información del esquema y el tipo de fuente.
instanceId La instancia de Cloud Spanner en la que se replican los cambios.
databaseId La base de datos de Cloud Spanner en la que se replican los cambios.
projectId El ID del proyecto de Cloud Spanner.
deadLetterQueueDirectory Esta es ruta de acceso del archivo para almacenar el resultado de la cola de errores (opcional). El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow.
inputFileFormat El formato del archivo de salida que produce Datastream (opcional). Por ejemplo: avro,json. Valor predeterminado, avro.
shadowTablePrefix El prefijo que se usa para nombrar las tablas de paralelas (opcional). Valor predeterminado: shadow_.

Ejecuta la plantilla de Datastream para Cloud Spanner

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona Datastream to Cloud Spanner template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de la herramienta de gcloud a fin de ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

Reemplaza lo siguiente:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: es la instancia de Cloud Spanner.
  • CLOUDSPANNER_DATABASE: es la base de datos de Cloud Spanner.
  • DLQ: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: es la instancia de Cloud Spanner.
  • CLOUDSPANNER_DATABASE: es la base de datos de Cloud Spanner.
  • DLQ: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner",
   }
}
  

Archivos de texto en Cloud Storage a BigQuery (transmisión)

La canalización de archivos de texto en Cloud Storage a BigQuery es una canalización de transmisión que te permite transmitir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones y adjuntar el resultado en BigQuery.

La canalización se ejecuta de forma indefinida, y se debe finalizar de forma manual mediante una “cancelación” y no un “drain”, debido a su uso de la transformación “Watch”, que es un “SplittableDoFn” que no admite el desvío.

Requisitos para esta canalización:

  • Crea un archivo JSON que describa el esquema de tu tabla de salida en BigQuery.

    Asegúrate de que haya un array JSON de nivel superior titulado BigQuery Schema y que su contenido siga el patrón {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Por ejemplo:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Crea un archivo JavaScript (.js) con tu función UDF que proporciona la lógica para transformar las líneas de texto. Ten en cuenta que tu función debe mostrar una string JSON.

    Por ejemplo, esta función divide cada línea de un archivo CSV y muestra una string JSON después de transformar los valores.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Parámetros de la plantilla

Parámetro Descripción
javascriptTextTransformGcsPath Ubicación de Cloud Storage de tu UDF de JavaScript. Por ejemplo: gs://my_bucket/my_function.js.
JSONPath Ubicación de Cloud Storage de tu archivo de esquema de BigQuery, descrito como un JSON. Por ejemplo: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName El nombre de la función de JavaScript que quieres nombrar como tu UDF. Por ejemplo: transform.
outputTable La tabla de BigQuery completamente calificada. Por ejemplo: my-project:dataset.table
inputFilePattern Ubicación en Cloud Storage del texto que quieres procesar. Por ejemplo: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Directorio temporal para el proceso de carga de BigQuery. Por ejemplo: gs://my-bucket/my-files/temp_dir.
outputDeadletterTable Tabla de mensajes que no llegaron a la tabla de resultados. Por ejemplo: my-project:dataset.my-unprocessed-table. Si no existe, se creará durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla de Cloud Storage Text a BigQuery (transmisión)

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Cloud Storage Text to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • JAVASCRIPT_FUNCTION: Es el nombre de la UDF.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript.
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • JAVASCRIPT_FUNCTION: Es el nombre de la UDF.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript.
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Archivos de texto en Cloud Storage a Pub/Sub (transmisión)

Esta plantilla crea una canalización de transmisión que sondea de forma continua los archivos de texto nuevos subidos a Cloud Storage, lee cada archivo línea por línea y publica strings en un tema de Pub/Sub. La plantilla publica registros en un archivo delimitado por saltos de línea que contiene registros JSON o archivos CSV en un tema de Pub/Sub para su procesamiento en tiempo real. Puedes usar esta plantilla para reproducir datos en Pub/Sub.

La canalización se ejecuta de forma indefinida, y se debe finalizar de forma manual mediante una “cancelación” y no un “drain”, debido a su uso de la transformación “Watch”, que es un “SplittableDoFn” que no admite el desvío.

Actualmente, el intervalo de sondeo es fijo y configurado en 10 segundos. Esta plantilla no establece una marca de tiempo en los registros individuales. Es por eso que la hora del evento será la misma que la hora de publicación durante la ejecución. Si tu canalización depende de la hora precisa del evento para el procesamiento, no deberías usar esta canalización.

Requisitos para esta canalización:

  • Los archivos de entrada deben tener el formato JSON delimitado por saltos de línea o CSV. Los registros que abarcan varias líneas en los archivos de origen pueden causar problemas de bajada, ya que cada línea dentro de los archivos se publicará como un mensaje en Pub/Sub.
  • El tema de Pub/Sub debe existir antes de la ejecución.
  • La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/files/*.json o gs://bucket-name/path/*.csv.
outputTopic El tema de entrada de Pub/Sub en el que se desea escribir. El nombre debe tener el formato projects/<project-id>/topics/<topic-name>.

Ejecuta la plantilla de archivos de texto en Cloud Storage a Pub/Sub (transmisión)

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Text Files on Cloud Storage to Pub/Sub (Stream) template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • FILE_PATTERN: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo, path/*.csv).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • FILE_PATTERN: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo, path/*.csv).
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP)

La plantilla de enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP) es una canalización de transmisión que lee archivos csv de un bucket de Cloud Storage, llama a la API de Cloud Data Loss Prevention (Cloud DLP) para la desidentificación y escribe los datos desidentificados en la tabla de BigQuery especificada. Esta plantilla admite el uso de una plantilla de inspección de Cloud DLP y una plantilla de desidentificación de Cloud DLP. Esto permite que los usuarios inspeccionen información que puede ser sensible y desidentificar datos estructurados en los que las columnas están especificadas para ser desidentificadas y no se necesita la inspección.

Requisitos para esta canalización:

  • Los datos de entrada para la asignación de tokens deben existir
  • Las plantillas de Cloud DLP deben existir (por ejemplo, InspectTemplate y DeidentifyTemplate). Consulta Plantillas de Cloud DLP para obtener más detalles.
  • El conjunto de datos de BigQuery debe existir

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern Los archivos csv desde los que se leen los registros de datos de entrada. También se acepta el comodín. Por ejemplo, gs://mybucket/my_csv_filename.csv o gs://mybucket/file-*.csv.
dlpProjectId ID del proyecto de Cloud DLP que posee el recurso de la API de Cloud DLP. Este proyecto de Cloud DLP puede ser el mismo que posee las plantillas de Cloud DLP, o puede ser uno independiente. Por ejemplo, my_dlp_api_project.
deidentifyTemplateName Plantilla de desidentificación de Cloud DLP que se usa para las solicitudes a la API, especificada con el patrón projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Por ejemplo, projects/my_project/deidentifyTemplates/100.
datasetName Conjunto de datos de BigQuery para enviar resultados con asignación de token.
batchSize Tamaño de fragmentación o del lote para enviar datos a fin de inspeccionar o quitar la asignación de token. En el caso de un archivo csv, batchSize es la cantidad de filas en un lote. Los usuarios deben determinar el tamaño del lote según el tamaño de los registros y del archivo. Ten en cuenta que la API de Cloud DLP tiene un límite de tamaño de carga útil de 524 KB por llamada a la API.
inspectTemplateName Plantilla de inspección de Cloud DLP que se usará para solicitudes a la API, especificada con el patrón projects/{template_project_id}/identifyTemplates/{idTemplateId} (opcional). Por ejemplo, projects/my_project/identifyTemplates/100.

Ejecuta la plantilla de enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP)

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 o superior del SDK de Cloud.

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Reemplaza lo siguiente:

  • TEMPLATE_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • DLP_API_PROJECT_ID: Es el ID del proyecto de la API de Cloud DLP.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_DATA: Es la ruta de acceso del archivo de entrada.
  • DEIDENTIFY_TEMPLATE: Es el número de plantilla de Cloud DLPDeidentify.
  • DATASET_NAME: Es el nombre del conjunto de datos de BigQuery.
  • INSPECT_TEMPLATE_NUMBER: Es el número de plantilla de Cloud DLPInspect.
  • BATCH_SIZE_VALUE: Es el tamaño del lote (número de filas por API para csv).
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

Ejecuta desde la API de REST

Cuando ejecutes esta plantilla, necesitarás la ruta de acceso de Cloud Storage a ella:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • TEMPLATE_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • DLP_API_PROJECT_ID: Es el ID del proyecto de la API de Cloud DLP.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION: Es el extremo regional (por ejemplo, us-west1)
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_DATA: Es la ruta de acceso del archivo de entrada.
  • DEIDENTIFY_TEMPLATE: Es el número de plantilla de Cloud DLPDeidentify.
  • DATASET_NAME: Es el nombre del conjunto de datos de BigQuery.
  • INSPECT_TEMPLATE_NUMBER: Es el número de plantilla de Cloud DLPInspect.
  • BATCH_SIZE_VALUE: Es el tamaño del lote (número de filas por API para csv).
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub (transmisión)

La plantilla Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub es una canalización de transmisión que lee mensajes de Pub/Sub con datos de modificación desde una base de datos de MySQL y escribe los registros en BigQuery. Un conector Debezium captura las modificaciones en la base de datos de MySQL y publica los datos modificados en Pub/Sub. Luego, la plantilla lee los mensajes de Pub/Sub y los escribe en BigQuery.

Puedes usar esta plantilla para sincronizar las bases de datos de MySQL y las tablas de BigQuery. La canalización escribe los datos modificados en una tabla de etapa de pruebas de BigQuery y actualiza de forma intermitente una tabla de BigQuery que replica la base de datos de MySQL.

Requisitos para esta canalización:

Parámetros de la plantilla

Parámetro Descripción
inputSubscriptions La lista separada por comas de las suscripciones de entrada de Pub/Sub desde la que se va a leer, en el formato <subscription>,<subscription>, ...
changeLogDataset El conjunto de datos de BigQuery para almacenar las tablas de etapa de pruebas, en el formato <my-dataset>
replicaDataset La ubicación del conjunto de datos de BigQuery para almacenar las tablas de réplica, en el formato <my-dataset>
updateFrequencySecs (Opcional) el intervalo en el que la canalización actualiza la tabla de BigQuery y replica la base de datos de MySQL.

Ejecuta la plantilla de Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub

Para ejecutar esta plantilla, sigue estos pasos:

  1. En tu máquina local, clona el repositorio DataflowTemplates.
  2. Cambia al directorio v2/cdc-parent.
  3. Asegúrate de que el conector Debezium esté implementado.
  4. Mediante Maven, ejecuta la plantilla de Dataflow.

    Reemplaza los siguientes valores:

    • PROJECT_ID: el ID de tu proyecto
    • YOUR_SUBSCRIPTIONS: la lista separada por comas de los nombres de suscripción a Pub/Sub
    • YOUR_CHANGELOG_DATASET: el conjunto de datos de BigQuery para los datos de registro de cambios
    • YOUR_REPLICA_DATASET: el conjunto de datos de BigQuery para tablas de réplica
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka a BigQuery

La plantilla de Apache Kafka a BigQuery es una canalización de transmisión que transfiere datos de texto de Apache Kafka, ejecuta una función definida por el usuario (UDF) y envía los registros resultantes a BigQuery. Cualquier error que ocurra en la transformación de los datos, la ejecución de la UDF o la inserción en la tabla de resultados se inserta en una tabla de errores independiente en BigQuery. Si la tabla de errores no existe antes de la ejecución, se creará.

Requisitos para esta canalización

  • La tabla de salida de BigQuery debe existir.
  • El servidor del agente de Apache Kafka debe estar en ejecución y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.
  • Los temas de Apache Kafka deben existir, y los mensajes deben estar codificados en un formato JSON válido.

Parámetros de la plantilla

Parámetro Descripción
outputTableSpec La ubicación de la tabla de salida de BigQuery en la que se deben escribir los mensajes de Apache Kafka, en el formato my-project:dataset.table.
inputTopics Los temas de entrada de Apache Kafka desde los que se debe leer en una lista separada por comas. Por ejemplo: messages.
bootstrapServers La dirección del host de los servidores del agente de Apache Kafka en ejecución en una lista separada por comas, cada dirección de host tiene el formato 35.70.252.199:9092.
javascriptTextTransformGcsPath La ruta de acceso de la ubicación de Cloud Storage a la UDF de JavaScript (opcional). Por ejemplo: gs://my_bucket/my_function.js.
javascriptTextTransformFunctionName El nombre del JavaScript que se llamará como tu UDF (opcional). Por ejemplo: transform
outputDeadletterTable (Opcional) la tabla de BigQuery para los mensajes que no llegaron a la tabla de salida, en el formato my-project:dataset.my-deadletter-table. Si no existe, la tabla se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla de Apache Kafka a BigQuery

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona the Apache Kafka to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de la herramienta de gcloud a fin de ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Reemplaza lo siguiente:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript.
  • YOUR_JAVASCRIPT_FUNCTION: Es el nombre de la UDF.
  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: Es la ruta de acceso de Cloud Storage al archivo .js que contiene el código JavaScript.
  • YOUR_JAVASCRIPT_FUNCTION: Es el nombre de la UDF.
  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}
  

Datastream para BigQuery (transmisión)

La plantilla de Datastream a BigQuery es una canalización de transmisión que lee datos de Datastream y los replica en BigQuery. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en una tabla de etapa de pruebas de BigQuery particionada por tiempo. Después de la replicación, la plantilla ejecuta una MERGE en BigQuery para actualizar todos los cambios de captura de datos modificados (CDC) en una réplica de la tabla de origen.

La plantilla controla la creación y la actualización de las tablas de BigQuery que administra la replicación. Cuando se requiere un lenguaje de definición de datos (DDL), una devolución de llamada a Datastream extrae el esquema de la tabla de origen y lo traduce a los tipos de datos de BigQuery. Las operaciones admitidas incluyen las siguientes:

  • Las tablas nuevas se crean a medida que se insertan los datos.
  • Se agregan columnas nuevas a las tablas de BigQuery con valores iniciales nulos.
  • Las columnas descartadas se ignoran en BigQuery y los valores futuros son nulos.
  • Las columnas cuyos nombres se han cambiado se agregan a BigQuery como columnas nuevas.
  • Los cambios de tipo no se propagan a BigQuery.

Requisitos para esta canalización:

  • Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
  • Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
  • Se crean los conjuntos de datos de destino de BigQuery y se les otorgó acceso de administrador a la cuenta de servicio de Compute Engine.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern La ubicación de los archivos de Datastream en Cloud Storage para replicar. Esta ubicación suele ser la ruta de acceso raíz de la transmisión.
gcsPubSubSubscription La suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo, projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME.
inputFileFormat El formato del archivo de salida que produce Datastream. Por ejemplo: avro,json. Valor predeterminado, avro.
outputStagingDatasetTemplate El nombre de un conjunto de datos existente para contener tablas de etapa de pruebas. Puedes incluir la plantilla {_metadata_dataset} como marcador de posición que se reemplazará por el nombre del conjunto de datos o esquema de origen (p.ej., {_metadata_dataset}_log).
outputDatasetTemplate El nombre de un conjunto de datos existente que contiene tablas de réplica. Puedes incluir la plantilla {_metadata_dataset} como marcador de posición que se reemplazará por el nombre del conjunto de datos o esquema de origen (p.ej., {_metadata_dataset}).
outputStagingTableNameTemplate La plantilla para el nombre de las tablas de etapa de pruebas (opcional). El valor predeterminado es {_metadata_table}_log. Si replicas varios esquemas, la sugerencia es {_metadata_schema}_{_metadata_table}_log.
outputTableNameTemplate La plantilla para el nombre de las tablas de réplica (opcional). Valor predeterminado, {_metadata_table}. Si replicas varios esquemas, la sugerencia es {_metadata_schema}_{_metadata_table}.
outputProjectId Proyecto para los conjuntos de datos de BigQuery en los que se deben generar datos (opcional). El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
deadLetterQueueDirectory La ruta de acceso del archivo para almacenar los mensajes no procesados con el motivo por el que no se pudieron procesar (opcional). El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
streamName El nombre o la plantilla del flujo que se consultará para obtener la información del esquema (opcional). Valor predeterminado, {_metadata_stream}.
mergeFrequencyMinutes La cantidad de minutos entre combinaciones para una tabla determinada (opcional). Valor predeterminado, 5.
dlqRetryMinutes La cantidad de minutos entre reintentos de la cola de mensajes no entregados (DLQ) (opcional). Valor predeterminado, 10.

Ejecuta la plantilla de Datastream a BigQuery

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona Datastream to BigQuery template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de la herramienta de gcloud a fin de ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery

Reemplaza lo siguiente:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • BIGQUERY_DATASET: es el nombre de tu conjunto de datos de BigQuery.
  • BIGQUERY_TABLE: es la plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • BIGQUERY_DATASET: es el nombre de tu conjunto de datos de BigQuery.
  • BIGQUERY_TABLE: es la plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

Datastream para PostgreSQL (transmisión)

La plantilla de Datastream a PostgreSQL es una canalización de transmisión que lee los datos de Datastream y los replica en cualquier base de datos de PostgreSQL. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en tablas de réplica de PostgreSQL.

La plantilla no es compatible con el lenguaje de definición de datos (DDL) y espera que todas las tablas ya existan en PostgreSQL. La replicación usa transformaciones con estado de Dataflow para filtrar los datos inactivos y garantizar la coherencia dentro de los datos desordenados. Por ejemplo, si ya se pasó una versión más reciente de una fila, se ignorará una versión tardía de esa fila. El lenguaje de manipulación de datos (DML) que se ejecuta es el mejor intento de replicar perfectamente los datos de origen o destino. Las declaraciones DML ejecutadas siguen las siguientes reglas:

  • Si existe una clave primaria, las operaciones de inserción y actualización usan una sintaxis de upsert (es decir, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Si las claves primarias existen, las eliminaciones se replican como un DML borrado.
  • Si no existe una clave primaria, se insertan las operaciones de inserción y actualización en la tabla.
  • Si no existen claves primarias, se ignoran las eliminaciones.

Si usas las utilidades de Oracle para Postgres, agrega ROWID en PostgreSQL como la clave primaria cuando no exista ninguna.

Los requisitos de esta canalización son los siguientes:

  • Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
  • Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
  • Se propagó una base de datos de PostgreSQL con el esquema requerido.
  • Se configura el acceso a la red entre los trabajadores de Dataflow y PostgreSQL.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern La ubicación de los archivos de Datastream en Cloud Storage para replicar. Esta ubicación suele ser la ruta de acceso raíz de la transmisión.
gcsPubSubSubscription La suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo, projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME.
inputFileFormat El formato del archivo de salida que produce Datastream. Por ejemplo: avro,json. Valor predeterminado, avro.
databaseHost El host PostgreSQL para conectarse.
databaseUser El usuario de PostgreSQL con todos los permisos necesarios para escribir en todas las tablas en la replicación
databasePassword La contraseña para el usuario de PostgreSQL especificado.
databasePort El puerto de la base de datos de PostgreSQL al que se realizará la conexión (opcional). Valor predeterminado, 5432.
databaseName El nombre de la base de datos de PostgreSQL a la que se realizará la conexión (opcional). Valor predeterminado, postgres.
streamName El nombre o la plantilla del flujo que se consultará para obtener la información del esquema (opcional). Valor predeterminado, {_metadata_stream}.

Ejecuta la plantilla de Datastream a PostgreSQL

Console

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla
  4. Botón Crear trabajo a partir de una plantilla de Cloud Console
  5. Selecciona Datastream to PostgreSQL template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Ejecutar trabajo.

gcloud

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de la herramienta de gcloud a fin de ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Postgres

Reemplaza lo siguiente:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • DATABASE_HOST: es la IP del host de PostgreSQL.
  • DATABASE_USER: es el usuario de PostgreSQL.
  • DATABASE_PASSWORD: es la contraseña de PostgreSQL.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Postgres \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Postgres

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

Reemplaza lo siguiente:

  • YOUR_PROJECT_ID: Es el ID de tu proyecto de plantilla.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: Es el nombre de la región de Dataflow (por ejemplo, us-central1)
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • DATABASE_HOST: es la IP del host de PostgreSQL.
  • DATABASE_USER: es el usuario de PostgreSQL.
  • DATABASE_PASSWORD: es la contraseña de PostgreSQL.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Postgres",
   }
}