Plantillas de transmisión que proporciona Google

Google proporciona un conjunto de plantillas de código abierto de Dataflow. Para obtener información general sobre las plantillas, consulta Plantillas de Dataflow. Para obtener una lista de todas las plantillas proporcionadas por Google, consulta Comienza a usar las plantillas proporcionadas por Google.

En esta guía, se documentan las plantillas de transmisión.

Suscripción de Pub/Sub a BigQuery

La plantilla de suscripción de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos para esta canalización:

  • Los mensajes de Pub/Sub deben estar en formato JSON, como se describe aquí. Por ejemplo, los mensajes con formato {"k1":"v1", "k2":"v2"} pueden insertarse en una tabla de BigQuery con dos columnas, denominadas k1 y k2, con el tipo de datos de string.
  • La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription>.
outputTableSpec Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable La tabla de BigQuery para los mensajes que no llegaron a la tabla de salida, en el formato <my-project>:<my-dataset>.<my-table>. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa OUTPUT_TABLE_SPEC_error_records en su lugar.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

Ejecuta la plantilla de suscripción de Pub/Sub a BigQuery

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Subscription to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.

Tema de Pub/Sub a BigQuery

La plantilla de tema de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON de un tema de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos para esta canalización:

  • Los mensajes de Pub/Sub deben estar en formato JSON, como se describe aquí. Por ejemplo, los mensajes con formato {"k1":"v1", "k2":"v2"} pueden insertarse en una tabla de BigQuery con dos columnas, denominadas k1 y k2, con el tipo de datos de string.
  • La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<project>/topics/<topic>.
outputTableSpec Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable La tabla de BigQuery para los mensajes que no llegaron a la tabla de resultados. Debe estar en formato <my-project>:<my-dataset>.<my-table>. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

Ejecuta la plantilla del tema de Pub/Sub a BigQuery

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Topic to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.

Pub/Sub Avro a BigQuery

La plantilla de Pub/Sub Avro a BigQuery es una canalización de transmisión que transfiere datos de Avro desde una suscripción de Pub/Sub a una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.

Requisitos para esta canalización

  • La suscripción de entrada de Pub/Sub debe existir.
  • El archivo de esquema para los registros de Avro debe existir en Cloud Storage.
  • El tema de Pub/Sub sin procesar debe existir. ¿
  • El conjunto de datos de salida de BigQuery debe existir.

Parámetros de la plantilla

Parámetro Descripción
schemaPath Ubicación de Cloud Storage del archivo de esquema de Avro. Por ejemplo, gs://path/to/my/schema.avsc.
inputSubscription Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription>
outputTopic El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec Ubicación de la tabla de salida de BigQuery. Por ejemplo, <my-project>:<my-dataset>.<my-table>. Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el esquema de Avro proporcionado por el usuario.
writeDisposition La WriteDisposition de BigQuery (opcional). Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Predeterminada: WRITE_APPEND.
createDisposition La CreateDisposition de BigQuery (opcional). Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. Predeterminada: CREATE_IF_NEEDED.

Ejecuta la plantilla de Pub/Sub Avro a BigQuery

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Avro to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

Pub/Sub a Pub/Sub

La plantilla de Pub/Sub a Pub/Sub es una canalización de transmisión que lee mensajes de una suscripción de Pub/Sub y los escribe en otro tema de Pub/Sub. La canalización también acepta una clave de atributo de mensaje opcional y un valor que se puede usar para filtrar los mensajes que se deben escribir en el tema de Pub/Sub. Puedes usar esta plantilla para copiar mensajes de una suscripción de Pub/Sub a otro tema de Pub/Sub con un filtro de mensajes opcional.

Requisitos para esta canalización:

  • La suscripción a Pub/Sub de origen debe existir antes de la ejecución.
  • El tema de Pub/Sub de destino debe existir antes de la ejecución.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Suscripción a Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Tema de Cloud Pub/Sub en el que se escribe el resultado. Por ejemplo, projects/<project-id>/topics/<topic-name>.
filterKey Eventos de filtro basados en la clave de atributo (opcional). No se aplican filtros si no se especifica filterKey.
filterValue Valor del atributo del filtro para usar en caso de que se provea filterKey (opcional). De forma predeterminada, se usa un filterValue nulo.

Ejecuta la plantilla de Pub/Sub a Pub/Sub

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Pub/Sub template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. No se filtrarán las coincidencias parciales (p. ej., substring). De forma predeterminada, se usa un valor de filtro de evento nulo.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. No se filtrarán las coincidencias parciales (p. ej., substring). De forma predeterminada, se usa un valor de filtro de evento nulo.

Pub/Sub a Splunk

La plantilla de Pub/Sub a Splunk es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub y escribe la carga útil del mensaje en Splunk mediante el recopilador de eventos HTTP (HEC) de Splunk. El caso de uso más común de esta plantilla es exportar registros a Splunk. Para ver un ejemplo del flujo de trabajo subyacente, consulta Implementa exportaciones de registros listas para la producción a Splunk mediante Dataflow.

Antes de escribir en Splunk, también puedes aplicar una función definida por el usuario de JavaScript a la carga útil del mensaje. Los mensajes con fallas de procesamiento se reenvían a un tema de mensajes no enviados de Pub/Sub para solucionar los problemas y volver a procesarlos.

Como una capa adicional de protección para tu token HEC, también puedes pasar una clave de Cloud KMS junto con el parámetro de token HEC codificado en base64 encriptado con la clave de Cloud KMS. Consulta el extremo de encriptación de la API de Cloud KMS para obtener detalles adicionales sobre la encriptación de tu parámetro de token HEC.

Requisitos para esta canalización:

  • La suscripción de Pub/Sub de origen debe existir antes de ejecutar la canalización.
  • El tema sin procesar de Pub/Sub debe existir antes de ejecutar la canalización.
  • Se debe poder acceder al extremo de HEC de Splunk desde la red de trabajadores de Dataflow.
  • El token HEC de Splunk se debe generar y estar disponible.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription La suscripción de Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
token El token de autenticación HEC de Splunk. Esta string codificada en base64 se puede encriptar con una clave de Cloud KMS para mayor seguridad.
url La URL de HEC de Splunk. Debe ser enrutable desde la VPC en la que se ejecuta la canalización. Por ejemplo, https://splunk-hec-host:8088.
outputDeadletterTopic El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
batchCount El tamaño del lote para enviar varios eventos a Splunk (opcional). Predeterminado 1 (sin lotes).
parallelism La cantidad máxima de solicitudes paralelas (opcional). Predeterminado 1 (sin paralelismo).
disableCertificateValidation Inhabilita la validación del certificado SSL (opcional). El valor predeterminado es falso (validación habilitada). Si es verdadero, los certificados no se validan (todos los certificados son de confianza) y se ignora el parámetro “rootCaCertificatePath”.
includePubsubMessage Incluye el mensaje de Pub/Sub completo en la carga útil (opcional). El valor predeterminado es falso (solo se incluye el elemento de datos en la carga útil).
tokenKMSEncryptionKey La clave de Cloud KMS para desencriptar la string del token HEC (opcional). Si se proporciona la clave de Cloud KMS, la string del token HEC debe encriptarse.
rootCaCertificatePath La URL completa al certificado de CA raíz en Cloud Storage (opcional). Por ejemplo, gs://mybucket/mycerts/privateCA.crt. El certificado provisto en Cloud Storage debe estar codificado en DER y puede proporcionarse en codificación binaria o imprimible (Base64). Si el certificado se proporciona en codificación Base64, debe estar delimitado al comienzo por ***BEGIN CERTIFICATE*** y debe estar limitado al final por ***END CERTIFICATE*** Si se proporciona este parámetro, este archivo de certificado de CA privado se recuperará y se agregará al almacén de confianza del trabajador de Dataflow para verificar el certificado SSL del extremo del HEC de Splunk. Si no se proporciona este parámetro, se usa el almacén de confianza predeterminado.

Ejecuta la plantilla de Pub/Sub a Splunk

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Splunk template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
  • URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
  • DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
  • PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
  • DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
  • ROOT_CA_CERTIFICATE_PATH: La ruta al certificado de CA raíz en Cloud Storage (por ejemplo, gs://your-bucket/privateCA.crt)

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
  • URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
  • DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
  • PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
  • DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
  • ROOT_CA_CERTIFICATE_PATH: La ruta al certificado de CA raíz en Cloud Storage (por ejemplo, gs://your-bucket/privateCA.crt)

Pub/Sub a archivos Avro en Cloud Storage

La plantilla de Pub/Sub a archivos de Avro en Cloud Storage es una canalización de transmisión que lee datos de un tema de Pub/Sub y escribe archivos de Avro en el bucket de Cloud Storage especificado.

Requisitos para esta canalización:

  • El tema de entrada de Pub/Sub debe existir antes de la ejecución de la canalización.

Parámetros de la plantilla

Parámetro Descripción
inputTopic Tema de Pub/Sub al cual suscribirse para el consumo de mensaje. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory Directorio de salida en el que se archivan los archivos Avro de salida. Debe contener / al final. Por ejemplo: gs://example-bucket/example-directory/.
avroTempDirectory Directorio para los archivos de Avro temporales. Debe contener / al final. Por ejemplo: gs://example-bucket/example-directory/.
outputFilenamePrefix Prefijo de nombre de archivo de salida para los archivos Avro (opcional).
outputFilenameSuffix [Opcional] Sufijo de nombre de archivo de salida para los archivos Avro.
outputShardTemplate [Opcional] Plantilla de fragmentación del archivo de salida. Se especifica como secuencias repetidas de las letras S o N. Por ejemplo, SSS-NNN. Estas se reemplazan por el número de fragmento o la cantidad total de fragmentos, respectivamente. Si no se especifica este parámetro, el formato de plantilla predeterminado es W-P-SS-of-NN.

Ejecuta la plantilla de Pub/Sub a Cloud Storage Avro

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Avro Files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.

Pub/Sub a archivos de texto en Cloud Storage

La plantilla de Pub/Sub a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.

Requisitos para esta canalización:

  • El tema de Pub/Sub debe existir antes de la ejecución.
  • Los mensajes publicados en el tema deben tener formato de texto.
  • Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/. El valor debe terminar con una barra.
outputFilenamePrefix El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output-
outputFilenameSuffix El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv.
outputShardTemplate La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos llegarán a un único archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate será 00-of-01.

Ejecuta la plantilla de Pub/Sub a archivos de texto en Cloud Storage

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Text Files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.

Pub/Sub a MongoDB

La plantilla de Pub/Sub a MongoDB es una canalización de transmisión que lee mensajes con codificación JSON de una suscripción a Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, esta canalización admite transformaciones adicionales que se pueden incluir mediante una función definida por el usuario (UDF) de JavaScript. Cualquier error que se produjo debido a una falta de coincidencia del esquema, JSON con errores o mientras se ejecutaban transformaciones se registra en una tabla de BigQuery para mensajes no procesados junto con un mensaje de entrada. Si no existe una tabla para los registros no procesados antes de la ejecución, la canalización crea esta tabla de forma automática.

Requisitos para esta canalización:

  • La suscripción a Pub/Sub debe existir y los mensajes deben estar codificados en un formato JSON válido.
  • El clúster de MongoDB debe existir y debe ser accesible desde las máquinas de trabajador de Dataflow.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Nombre de la suscripción a Pub/Sub. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
mongoDBUri Una lista separada por comas de los servidores de MongoDB. Por ejemplo: 192.285.234.12:27017,192.287.123.11:27017.
database La base de datos en MongoDB en la que se debe almacenar la colección. Por ejemplo: my-db.
collection Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection.
deadletterTable La tabla de BigQuery que almacena mensajes debido a fallas (esquemas no coincidentes, JSON con formato incorrecto, etcétera). Por ejemplo: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
batchSize El tamaño de lote que se usa para la inserción por lotes de documentos en MongoDB (opcional). Valor predeterminado: 1000.
batchSizeBytes El tamaño del lote en bytes (opcional). Valor predeterminado: 5242880.
maxConnectionIdleTime El tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión (opcional). Valor predeterminado: 60000.
sslEnabled El valor booleano que indica si la conexión a MongoDB está habilitada con SSL (opcional). Valor predeterminado: true.
ignoreSSLCertificate El valor booleano que indica si se debe ignorar el certificado SSL (opcional). Valor predeterminado: true.
withOrdered El valor booleano que habilita las inserciones masivas ordenadas en MongoDB (opcional). Valor predeterminado: true.
withSSLInvalidHostNameAllowed El valor booleano que indica si se permite un nombre de host no válido para la conexión SSL (opcional). Valor predeterminado: true.

Ejecuta la plantilla de Pub/Sub a MongoDB

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to MongoDB template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).

Pub/Sub a Elasticsearch

La plantilla de Pub/Sub a Elasticsearch es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub, ejecuta una función definida por el usuario (UDF) y los escribe en Elasticsearch como documentos. La plantilla de Dataflow usa la función de flujos de datos de Elasticsearch para almacenar datos de series temporales en varios índices y, al mismo tiempo, entregarte un solo recurso con nombre para solicitudes. Los flujos de datos son adecuados para los registros, las métricas, los seguimientos y otros datos generados de forma continua que se almacenan en Pub/Sub.

Requisitos para esta canalización

  • La suscripción a Pub/Sub debe existir, y los mensajes deben estar codificados en un formato JSON válido.
  • Un host de Elasticsearch accesible de forma pública en una instancia de GCP o en Elastic Cloud con Elasticsearch versión 7.0 o posterior. Consulta Integración de Google Cloud para Elastic si deseas obtener más detalles.
  • Un tema de Pub/Sub para resultados de error.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription La suscripción de Cloud Pub/Sub desde la que se realiza el consumo. El nombre debe tener el formato projects/<project-id>/subscriptions/<subscription-name>.
connectionUrl URL de Elasticsearch en el formato https://hostname:[port] o especificar el CloudID si usas Elastic Cloud.
apiKey Clave de API codificada en Base64 que se usa para la autenticación.
errorOutputTopic Tema de salida de Pub/Sub para publicar registros con errores en el formato projects/<project-id>/topics/<topic-name>
dataset El tipo de registros enviados a través de Pub/Sub para los que tenemos un panel listo para usar (opcional). Los valores de tipos de registros conocidos son audit, vpcflow y firewall. Valor predeterminado: pubsub.
namespace Una agrupación arbitraria, como un entorno (dev, prod, or qa), un equipo o una unidad de negocios estratégica (opcional). Valor predeterminado: default.
batchSize El tamaño del lote en cantidad de documentos (opcional). Valor predeterminado: 1000.
batchSizeBytes El tamaño del lote en cantidad de bytes (opcional). Valor predeterminado: 5242880 (5 MB).
maxRetryAttempts La cantidad máxima de reintentos debe ser mayor que 0 (opcional). Valor predeterminado: no retries.
maxRetryDuration La duración máxima del reintento en milisegundos debe ser superior a 0 (opcional). Valor predeterminado: no retries.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

Ejecuta la plantilla de Pub/Sub a Elasticsearch

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Elasticsearch template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • ERROR_OUTPUT_TOPIC: Es tu tema de Pub/Sub para resultados de error
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • CONNECTION_URL: Es tu URL de Elasticsearch
  • DATASET: Es tu tipo de registro
  • NAMESPACE: Es el espacio de nombres para el conjunto de datos
  • APIKEY: Es tu clave de API codificada en Base64 para la autenticación

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • ERROR_OUTPUT_TOPIC: Es tu tema de Pub/Sub para resultados de error
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • CONNECTION_URL: Es tu URL de Elasticsearch
  • DATASET: Es tu tipo de registro
  • NAMESPACE: Es el espacio de nombres para el conjunto de datos
  • APIKEY: Es tu clave de API codificada en Base64 para la autenticación

Datastream para Cloud Spanner

La plantilla de Datastream para Cloud Spanner es una canalización de transmisión que lee eventos de Datastream desde un bucket de Cloud Storage y los escribe en una base de datos de Cloud Spanner. Está diseñado para la migración de datos de fuentes de Datastream a Cloud Spanner.

Todas las tablas necesarias para la migración deben existir en la base de datos de destino de Cloud Spanner antes de la ejecución de la plantilla. Por lo tanto, la migración del esquema de una base de datos de origen a Cloud Spanner de destino debe completarse antes de la migración de datos. Los datos pueden existir en las tablas antes de la migración. Esta plantilla no propaga los cambios de esquema de Datastream a la base de datos de Cloud Spanner.

La coherencia de los datos solo está garantizada al final de la migración cuando todos los datos se escribieron en Cloud Spanner. A fin de almacenar información sobre el orden para cada registro escrito en Cloud Spanner, esta plantilla crea una tabla adicional (llamada tabla paralela) para cada tabla en la base de datos de Cloud Spanner. Esto se usa para garantizar la coherencia al final de la migración. Las tablas paralelas no se borran después de la migración y se pueden usar con fines de validación al final de la migración.

Cualquier error que ocurra durante la operación, como discrepancias de esquema, archivos JSON con formato incorrecto o errores resultantes de la ejecución de transformaciones, se registra en una cola de errores. La cola de errores es una carpeta de Cloud Storage que almacena todos los eventos de Datastream que encontraron errores junto con el motivo del error en formato de texto. Los errores pueden ser transitorios o permanentes, y se almacenan en las carpetas de Cloud Storage adecuadas en la cola de errores. Los errores transitorios se reintentan automáticamente, mientras que los errores permanentes no. En el caso de errores permanentes, tienes la opción de corregir los eventos de cambio y moverlos al bucket que se puede reintentar mientras se ejecuta la plantilla.

Requisitos para esta canalización:

  • Una transmisión de Datastream en estado En ejecución o No iniciado
  • Un bucket de Cloud Storage en el que se replican los eventos de Datastream.
  • Una base de datos de Cloud Spanner con tablas existentes. Estas tablas pueden estar vacías o contener datos.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern La ubicación de los archivos de Datastream en Cloud Storage para replicar. Por lo general, esta es la ruta de acceso raíz de una transmisión.
streamName El nombre o la plantilla del flujo que se consultará para obtener la información del esquema y el tipo de fuente.
instanceId La instancia de Cloud Spanner en la que se replican los cambios.
databaseId La base de datos de Cloud Spanner en la que se replican los cambios.
projectId El ID del proyecto de Cloud Spanner.
deadLetterQueueDirectory Esta es ruta de acceso del archivo para almacenar el resultado de la cola de errores (opcional). El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow.
inputFileFormat El formato del archivo de salida que produce Datastream (opcional). Por ejemplo: avro,json. Valor predeterminado, avro.
shadowTablePrefix El prefijo que se usa para nombrar las tablas de paralelas (opcional). Valor predeterminado: shadow_.

Ejecuta la plantilla de Datastream para Cloud Spanner

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Datastream to Spanner template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: es la instancia de Cloud Spanner.
  • CLOUDSPANNER_DATABASE: es la base de datos de Cloud Spanner.
  • DLQ: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: es la instancia de Cloud Spanner.
  • CLOUDSPANNER_DATABASE: es la base de datos de Cloud Spanner.
  • DLQ: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.

Archivos de texto en Cloud Storage a BigQuery (transmisión)

La canalización de archivos de texto en Cloud Storage a BigQuery es una canalización de transmisión que te permite transmitir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones y adjuntar el resultado en BigQuery.

La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual a través de una cancelación y no una desviación, debido a su uso de la transformación Watch, que es una DoFn divisible que no admite el desvío.

Requisitos para esta canalización:

  • Crea un archivo JSON que describa el esquema de tu tabla de salida en BigQuery.

    Asegúrate de que haya un array JSON de nivel superior titulado BigQuery Schema y que su contenido siga el patrón {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Por ejemplo:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Crea un archivo JavaScript (.js) con tu función UDF que proporciona la lógica para transformar las líneas de texto. Ten en cuenta que tu función debe mostrar una string JSON.

    Por ejemplo, esta función divide cada línea de un archivo CSV y muestra una string JSON después de transformar los valores.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Parámetros de la plantilla

Parámetro Descripción
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
JSONPath Ubicación de Cloud Storage de tu archivo de esquema de BigQuery, descrito como un JSON. Por ejemplo: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
outputTable La tabla de BigQuery calificada por completo. Por ejemplo: my-project:dataset.table
inputFilePattern Ubicación en Cloud Storage del texto que quieres procesar. Por ejemplo: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Directorio temporal para el proceso de carga de BigQuery. Por ejemplo: gs://my-bucket/my-files/temp_dir.
outputDeadletterTable Tabla de mensajes que no llegaron a la tabla de resultados. Por ejemplo: my-project:dataset.my-unprocessed-table. Si no existe, se creará durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla de Cloud Storage Text a BigQuery (transmisión)

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

Archivos de texto en Cloud Storage a Pub/Sub (transmisión)

Esta plantilla crea una canalización de transmisión que sondea de forma continua los archivos de texto nuevos subidos a Cloud Storage, lee cada archivo línea por línea y publica strings en un tema de Pub/Sub. La plantilla publica registros en un archivo delimitado por saltos de línea que contiene registros JSON o archivos CSV en un tema de Pub/Sub para su procesamiento en tiempo real. Puedes usar esta plantilla para reproducir datos en Pub/Sub.

La canalización se ejecuta de forma indefinida, y se debe finalizar de forma manual mediante una “cancelación” y no un “drain”, debido a su uso de la transformación “Watch”, que es un “SplittableDoFn” que no admite el desvío.

Actualmente, el intervalo de sondeo es fijo y configurado en 10 segundos. Esta plantilla no establece una marca de tiempo en los registros individuales. Es por eso que la hora del evento será la misma que la hora de publicación durante la ejecución. Si tu canalización depende de la hora precisa del evento para el procesamiento, no deberías usar esta canalización.

Requisitos para esta canalización:

  • Los archivos de entrada deben tener el formato JSON delimitado por saltos de línea o CSV. Los registros que abarcan varias líneas en los archivos de origen pueden causar problemas de bajada, ya que cada línea dentro de los archivos se publicará como un mensaje en Pub/Sub.
  • El tema de Pub/Sub debe existir antes de la ejecución.
  • La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/files/*.json o gs://bucket-name/path/*.csv.
outputTopic El tema de entrada de Pub/Sub en el que se desea escribir. El nombre debe tener el formato projects/<project-id>/topics/<topic-name>.

Ejecuta la plantilla de archivos de texto en Cloud Storage a Pub/Sub (transmisión)

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • FILE_PATTERN: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo, path/*.csv).

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • FILE_PATTERN: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo, path/*.csv).

Enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP)

La plantilla de enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP) es una canalización de transmisión que lee archivos csv de un bucket de Cloud Storage, llama a la API de Cloud Data Loss Prevention (Cloud DLP) para la desidentificación y escribe los datos desidentificados en la tabla de BigQuery especificada. Esta plantilla admite el uso de una plantilla de inspección de Cloud DLP y una plantilla de desidentificación de Cloud DLP. Esto permite que los usuarios inspeccionen información que puede ser sensible y desidentificar datos estructurados en los que las columnas están especificadas para ser desidentificadas y no se necesita la inspección.

Requisitos para esta canalización:

  • Los datos de entrada para la asignación de tokens deben existir
  • Las plantillas de Cloud DLP deben existir (por ejemplo, InspectTemplate y DeidentifyTemplate). Consulta Plantillas de Cloud DLP para obtener más detalles.
  • El conjunto de datos de BigQuery debe existir

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern Los archivos csv desde los que se leen los registros de datos de entrada. También se acepta el comodín. Por ejemplo, gs://mybucket/my_csv_filename.csv o gs://mybucket/file-*.csv.
dlpProjectId ID del proyecto de Cloud DLP que posee el recurso de la API de Cloud DLP. Este proyecto de Cloud DLP puede ser el mismo que posee las plantillas de Cloud DLP, o puede ser uno independiente. Por ejemplo, my_dlp_api_project.
deidentifyTemplateName Plantilla de desidentificación de Cloud DLP que se usa para las solicitudes a la API, especificada con el patrón projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Por ejemplo, projects/my_project/deidentifyTemplates/100.
datasetName Conjunto de datos de BigQuery para enviar resultados con asignación de token.
batchSize Tamaño de fragmentación o del lote para enviar datos a fin de inspeccionar o quitar la asignación de token. En el caso de un archivo csv, batchSize es la cantidad de filas en un lote. Los usuarios deben determinar el tamaño del lote según el tamaño de los registros y del archivo. Ten en cuenta que la API de Cloud DLP tiene un límite de tamaño de carga útil de 524 KB por llamada a la API.
inspectTemplateName Plantilla de inspección de Cloud DLP que se usará para solicitudes a la API, especificada con el patrón projects/{template_project_id}/identifyTemplates/{idTemplateId} (opcional). Por ejemplo, projects/my_project/identifyTemplates/100

Ejecuta la plantilla de enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP)

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

Reemplaza lo siguiente:

  • DLP_API_PROJECT_ID: Es el ID del proyecto de la API de Cloud DLP.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_DATA: Es la ruta de acceso del archivo de entrada.
  • DEIDENTIFY_TEMPLATE: Es el número de plantilla de Cloud DLPDeidentify.
  • DATASET_NAME: Es el nombre del conjunto de datos de BigQuery.
  • INSPECT_TEMPLATE_NUMBER: Es el número de plantilla de Cloud DLPInspect.
  • BATCH_SIZE_VALUE: Es el tamaño del lote (número de filas por API para csv).

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • DLP_API_PROJECT_ID: Es el ID del proyecto de la API de Cloud DLP.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_DATA: Es la ruta de acceso del archivo de entrada.
  • DEIDENTIFY_TEMPLATE: Es el número de plantilla de Cloud DLPDeidentify.
  • DATASET_NAME: Es el nombre del conjunto de datos de BigQuery.
  • INSPECT_TEMPLATE_NUMBER: Es el número de plantilla de Cloud DLPInspect.
  • BATCH_SIZE_VALUE: Es el tamaño del lote (número de filas por API para csv).