Plantillas de transmisión de Dataflow que proporciona Google

Google proporciona un conjunto de plantillas de código abierto de Dataflow.

Estas plantillas de Dataflow pueden ayudarte a resolver grandes tareas de datos, incluidas la importación, la exportación, la copia de seguridad y el restablecimiento de datos, y las operaciones de API masivas, todo sin el uso de un entorno de desarrollo dedicado. Las plantillas se compilan en Apache Beam y usan Dataflow para transformar los datos.

Para obtener información general sobre las plantillas, consulta Plantillas de Dataflow. Para obtener una lista de todas las plantillas proporcionadas por Google, consulta Comienza a usar las plantillas proporcionadas por Google.

En esta guía, se documentan las plantillas de transmisión.

Suscripción de Pub/Sub a BigQuery

La plantilla de suscripción de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos para esta canalización:

  • El campo data de los mensajes de Pub/Sub debe usar el formato JSON, que se describe en esta guía de JSON. Por ejemplo, los mensajes con valores en el campo data con formato {"k1":"v1", "k2":"v2"} se pueden insertar en una tabla de BigQuery con dos columnas, llamadas k1 y k2, con un tipo de datos de string.
  • La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription>.
outputTableSpec Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable La tabla de BigQuery para los mensajes que no llegaron a la tabla de salida, en el formato <my-project>:<my-dataset>.<my-table>. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa OUTPUT_TABLE_SPEC_error_records en su lugar.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

Ejecuta la plantilla de suscripción de Pub/Sub a BigQuery

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Subscription to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.

Tema de Pub/Sub a BigQuery

La plantilla de tema de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON de un tema de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.

Requisitos para esta canalización:

  • El campo data de los mensajes de Pub/Sub debe usar el formato JSON, que se describe en esta guía de JSON. Por ejemplo, los mensajes con valores en el campo data con formato {"k1":"v1", "k2":"v2"} se pueden insertar en una tabla de BigQuery con dos columnas, llamadas k1 y k2, con un tipo de datos de string.
  • La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<project>/topics/<topic>.
outputTableSpec Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable La tabla de BigQuery para los mensajes que no llegaron a la tabla de resultados. Debe estar en formato <my-project>:<my-dataset>.<my-table>. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

Ejecuta la plantilla del tema de Pub/Sub a BigQuery

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Topic to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.

Pub/Sub Avro a BigQuery

La plantilla de Pub/Sub Avro a BigQuery es una canalización de transmisión que transfiere datos de Avro desde una suscripción de Pub/Sub a una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.

Requisitos para esta canalización

  • La suscripción de entrada de Pub/Sub debe existir.
  • El archivo de esquema para los registros de Avro debe existir en Cloud Storage.
  • El tema de Pub/Sub sin procesar debe existir. ¿
  • El conjunto de datos de salida de BigQuery debe existir.

Parámetros de la plantilla

Parámetro Descripción
schemaPath Ubicación de Cloud Storage del archivo de esquema de Avro. Por ejemplo, gs://path/to/my/schema.avsc.
inputSubscription Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription>.
outputTopic El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec Ubicación de la tabla de salida de BigQuery. Por ejemplo, <my-project>:<my-dataset>.<my-table>. Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el esquema de Avro proporcionado por el usuario.
writeDisposition La WriteDisposition de BigQuery (opcional). Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Predeterminada: WRITE_APPEND.
createDisposition La CreateDisposition de BigQuery (opcional). Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. Predeterminada: CREATE_IF_NEEDED.

Ejecuta la plantilla de Pub/Sub Avro a BigQuery

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Avro to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo, gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • DEADLETTER_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

Proto de Pub/Sub a BigQuery

La plantilla de proto de Pub/Sub a BigQuery es una canalización de transmisión que transfiere datos de proto desde una suscripción a Pub/Sub hacia una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.

Se puede proporcionar una función definida por el usuario (UDF) de JavaScript para transformar los datos. Los errores mientras se ejecuta la UDF se pueden enviar a un tema de Pub/Sub separado o al mismo tema sin procesar que los errores de BigQuery.

Requisitos para esta canalización:

  • La suscripción de entrada de Pub/Sub debe existir.
  • El archivo de esquema de los registros proto debe existir en Cloud Storage.
  • El tema de Pub/Sub de salida debe existir.
  • El conjunto de datos de salida de BigQuery debe existir.
  • Si la tabla de BigQuery existe, debe tener un esquema que coincida con los datos del proto, sin importar el valor createDisposition.

Parámetros de la plantilla

Parámetro Descripción
protoSchemaPath La ubicación de Cloud Storage del archivo de esquema proto autónomo. Por ejemplo, gs://path/to/my/file.pb. Este archivo se puede generar con la marca --descriptor_set_out del comando protoc. La marca --include_imports garantiza que el archivo sea autónomo.
fullMessageName El nombre completo del mensaje proto. Por ejemplo, package.name.MessageName, en el que package.name es el valor proporcionado para la declaración package y no la declaración java_package.
inputSubscription Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription>.
outputTopic El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name>.
outputTableSpec Ubicación de la tabla de salida de BigQuery. Por ejemplo, my-project:my_dataset.my_table. Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el archivo de esquema de entrada.
preserveProtoFieldNames true para conservar el nombre del campo Proto original en JSON (opcional). false para usar nombres JSON más estándar. Por ejemplo, false cambiaría field_name a fieldName. (Default: false)
bigQueryTableSchemaPath Ruta de Cloud Storage a la ruta del esquema de BigQuery (opcional). Por ejemplo, gs://path/to/my/schema.json. Si no se proporciona, entonces el esquema se infiere a partir del esquema Proto.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
udfOutputTopic El tema de Pub/Sub que almacena los errores de las UDF (opcional). Por ejemplo, projects/<project-id>/topics/<topic-name>. Si no se proporciona, los errores de UDF se envían al mismo tema que outputTopic.
writeDisposition La WriteDisposition de BigQuery (opcional). Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Valor predeterminado: WRITE_APPEND.
createDisposition La CreateDisposition de BigQuery (opcional). Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. Configuración predeterminada: CREATE_IF_NEEDED.

Ejecuta la plantilla de proto de Pub/Sub a BigQuery

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Proto to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo, gs://MyBucket/file.pb).
  • PROTO_MESSAGE_NAME: Es el nombre del mensaje Proto (por ejemplo, package.name.MessageName).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • UNPROCESSED_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • SCHEMA_PATH: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo, gs://MyBucket/file.pb).
  • PROTO_MESSAGE_NAME: Es el nombre del mensaje Proto (por ejemplo, package.name.MessageName).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción de entrada de Pub/Sub.
  • BIGQUERY_TABLE: Es el nombre de la tabla de salida de BigQuery.
  • UNPROCESSED_TOPIC: Es el tema de Pub/Sub que se usará para la cola no procesada.

Pub/Sub a Pub/Sub

La plantilla de Pub/Sub a Pub/Sub es una canalización de transmisión que lee mensajes de una suscripción de Pub/Sub y los escribe en otro tema de Pub/Sub. La canalización también acepta una clave de atributo de mensaje opcional y un valor que se puede usar para filtrar los mensajes que se deben escribir en el tema de Pub/Sub. Puedes usar esta plantilla para copiar mensajes de una suscripción de Pub/Sub a otro tema de Pub/Sub con un filtro de mensajes opcional.

Requisitos para esta canalización:

  • La suscripción a Pub/Sub de origen debe existir antes de la ejecución.
  • La suscripción de Pub/Sub de origen debe ser una suscripción de extracción.
  • El tema de Pub/Sub de destino debe existir antes de la ejecución.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Suscripción a Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Tema de Cloud Pub/Sub en el que se escribe el resultado. Por ejemplo, projects/<project-id>/topics/<topic-name>.
filterKey Eventos de filtro basados en la clave de atributo (opcional). No se aplican filtros si no se especifica filterKey.
filterValue Valor del atributo del filtro para usar en caso de que se provea filterKey (opcional). De forma predeterminada, se usa un filterValue nulo.

Ejecuta la plantilla de Pub/Sub a Pub/Sub

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Pub/Sub template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.

Pub/Sub a Splunk

La plantilla de Pub/Sub a Splunk es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub y escribe la carga útil del mensaje en Splunk mediante el recopilador de eventos HTTP (HEC) de Splunk. El caso de uso más común de esta plantilla es exportar registros a Splunk. Para ver un ejemplo del flujo de trabajo subyacente, consulta Implementa exportaciones de registros listas para la producción a Splunk mediante Dataflow.

Antes de escribir en Splunk, también puedes aplicar una función definida por el usuario de JavaScript a la carga útil del mensaje. Los mensajes con fallas de procesamiento se reenvían a un tema de mensajes no enviados de Pub/Sub para solucionar los problemas y volver a procesarlos.

Como una capa adicional de protección para tu token HEC, también puedes pasar una clave de Cloud KMS junto con el parámetro de token HEC codificado en base64 encriptado con la clave de Cloud KMS. Consulta el extremo de encriptación de la API de Cloud KMS para obtener detalles adicionales sobre la encriptación de tu parámetro de token HEC.

Requisitos para esta canalización:

  • La suscripción de Pub/Sub de origen debe existir antes de ejecutar la canalización.
  • El tema sin procesar de Pub/Sub debe existir antes de ejecutar la canalización.
  • Se debe poder acceder al extremo de HEC de Splunk desde la red de trabajadores de Dataflow.
  • El token HEC de Splunk se debe generar y estar disponible.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription La suscripción de Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name>.
token (Opcional) El token de autenticación HEC de Splunk. Se debe proporcionar si tokenSource está configurado como PLAINTEXT o KMS.
url La URL de HEC de Splunk. Debe ser enrutable desde la VPC en la que se ejecuta la canalización. Por ejemplo, https://splunk-hec-host:8088.
outputDeadletterTopic El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
batchCount El tamaño del lote para enviar varios eventos a Splunk (opcional). Predeterminado 1 (sin lotes).
parallelism La cantidad máxima de solicitudes paralelas (opcional). Predeterminado 1 (sin paralelismo).
disableCertificateValidation Inhabilita la validación del certificado SSL (opcional). El valor predeterminado es falso (validación habilitada). Si es verdadero, los certificados no se validan (todos los certificados son de confianza) y se ignora el parámetro “rootCaCertificatePath”.
includePubsubMessage Incluye el mensaje de Pub/Sub completo en la carga útil (opcional). El valor predeterminado es falso (solo se incluye el elemento de datos en la carga útil).
tokenSource Fuente del token. Puede ser PLAINTEXT, KMS o SECRET_MANAGER. Este parámetro se debe proporcionar si se usa Secret Manager. Si tokenSource se configura como KMS, tokenKMSEncryptionKey y el token encriptado se deben proporcionar. Si tokenSource se configura como SECRET_MANAGER, tokenSecretId se debe proporcionar. Si tokenSource se configura como PLAINTEXT, token se debe proporcionar.
tokenKMSEncryptionKey La clave de Cloud KMS para desencriptar la string del token HEC (opcional). Este parámetro se debe proporcionar si tokenSource se configura como KMS. Si se proporciona la clave de Cloud KMS, la string del token HEC debe pasarse encriptada.
tokenSecretId (Opcional) El ID del Secret de Secret Manager para el token. Este parámetro debe proporcionarse si el tokenSource está configurado como SECRET_MANAGER. Debe tener el formato projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath La URL completa al certificado de CA raíz en Cloud Storage (opcional). Por ejemplo, gs://mybucket/mycerts/privateCA.crt. El certificado provisto en Cloud Storage debe estar codificado en DER y puede proporcionarse en codificación binaria o imprimible (Base64). Si el certificado se proporciona en codificación Base64, debe estar delimitado al comienzo por -----BEGIN CERTIFICATE----- y debe estar limitado al final por -----END CERTIFICATE-----. Si se proporciona este parámetro, este archivo de certificado de CA privado se recupera y se agrega al almacén de confianza del trabajador de Dataflow para verificar el certificado SSL del extremo del HEC de Splunk. Si no se proporciona este parámetro, se usa el almacén de confianza predeterminado.
enableBatchLogs (Opcional) Especifica si se deben habilitar los registros para los lotes escritos en Splunk. Valor predeterminado: true.
enableGzipHttpCompression (Opcional) Especifica si las solicitudes HTTP enviadas a HEC de Splunk deben comprimirse (contenido gzip codificado). Valor predeterminado: true.

Ejecuta la plantilla de Pub/Sub a Splunk

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Splunk template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
  • URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
  • DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
  • PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
  • DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
  • ROOT_CA_CERTIFICATE_PATH: La ruta al certificado de CA raíz en Cloud Storage (por ejemplo, gs://your-bucket/privateCA.crt)

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOKEN: Es el token del recopilador de eventos HTTP de Splunk.
  • URL: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo, https://splunk-hec-host:8088).
  • DEADLETTER_TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.
  • PARALLELISM: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.
  • DISABLE_VALIDATION: Es true si deseas inhabilitar la validación del certificado SSL.
  • ROOT_CA_CERTIFICATE_PATH: La ruta al certificado de CA raíz en Cloud Storage (por ejemplo, gs://your-bucket/privateCA.crt)

Pub/Sub a archivos Avro en Cloud Storage

La plantilla de Pub/Sub a archivos de Avro en Cloud Storage es una canalización de transmisión que lee datos de un tema de Pub/Sub y escribe archivos de Avro en el bucket de Cloud Storage especificado.

Requisitos para esta canalización:

  • El tema de entrada de Pub/Sub debe existir antes de la ejecución de la canalización.

Parámetros de la plantilla

Parámetro Descripción
inputTopic Tema de Pub/Sub al cual suscribirse para el consumo de mensaje. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory Directorio de salida en el que se archivan los archivos Avro de salida. Debe contener / al final. Por ejemplo: gs://example-bucket/example-directory/.
avroTempDirectory Directorio para los archivos de Avro temporales. Debe contener / al final. Por ejemplo: gs://example-bucket/example-directory/.
outputFilenamePrefix Prefijo de nombre de archivo de salida para los archivos Avro (opcional).
outputFilenameSuffix [Opcional] Sufijo de nombre de archivo de salida para los archivos Avro.
outputShardTemplate [Opcional] Plantilla de fragmentación del archivo de salida. Se especifica como secuencias repetidas de las letras S o N. Por ejemplo, SSS-NNN. Estas se reemplazan por el número de fragmento o la cantidad total de fragmentos, respectivamente. Si no se especifica este parámetro, el formato de plantilla predeterminado es W-P-SS-of-NN.

Ejecuta la plantilla de Pub/Sub a Cloud Storage Avro

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Avro Files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.

Tema de Pub/Sub a archivos de texto en Cloud Storage

La plantilla de Pub/Sub a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.

Requisitos para esta canalización:

  • El tema de Pub/Sub debe existir antes de la ejecución.
  • Los mensajes publicados en el tema deben tener formato de texto.
  • Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>.
outputDirectory La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/. El valor debe terminar con una barra.
outputFilenamePrefix El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output-.
outputFilenameSuffix El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv.
outputShardTemplate La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01.

Ejecuta la plantilla de Pub/Sub a archivos de texto en Cloud Storage

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Text Files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.

Tema de Pub/Sub o suscripción a archivos de texto en Cloud Storage

El tema de Pub/Sub o la suscripción a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.

Requisitos para esta canalización:

  • El tema o la suscripción a Pub/Sub deben existir antes de la ejecución.
  • Los mensajes publicados en el tema deben tener formato de texto.
  • Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.

Parámetros de la plantilla

Parámetro Descripción
inputTopic El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>. Si se proporciona este parámetro, no se debe proporcionar inputSubscription.
inputSubscription El tema de Pub/Sub desde el que se lee la entrada. El nombre de la suscripción debe tener el formato projects/<project-id>/subscription/<subscription-name>. Si se proporciona este parámetro, no se debe proporcionar inputTopic.
outputDirectory La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/. El valor debe terminar con una barra.
outputFilenamePrefix El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output-.
outputFilenameSuffix El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv.
outputShardTemplate La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01.
windowDuration La duración de la ventana es el intervalo en el que se escriben los datos en el directorio de salida (opcional). Configura la duración en función de la capacidad de procesamiento de la canalización. Por ejemplo, una capacidad de procesamiento mayor puede requerir tamaños de ventana más pequeños para que los datos se ajusten a la memoria. La configuración predeterminada es de 5 min, con un mínimo de 1 s. Los formatos permitidos son: [nro. entero] s (para los segundos, por ejemplo, 5 s), [nro. entero] min (para los minutos, por ejemplo, 12 min) y [nro. entero] h (para las horas, por ejemplo, 2 h).

Ejecuta la plantilla del tema o suscripción de Pub/Sub a archivos de texto en Cloud Storage

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.

Pub/Sub a MongoDB

La plantilla de Pub/Sub a MongoDB es una canalización de transmisión que lee mensajes con codificación JSON de una suscripción a Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, esta canalización admite transformaciones adicionales que se pueden incluir mediante una función definida por el usuario (UDF) de JavaScript. Cualquier error que se produjo debido a una falta de coincidencia del esquema, JSON con errores o mientras se ejecutaban transformaciones se registra en una tabla de BigQuery para mensajes no procesados junto con un mensaje de entrada. Si no existe una tabla para los registros no procesados antes de la ejecución, la canalización crea esta tabla de forma automática.

Requisitos para esta canalización:

  • La suscripción a Pub/Sub debe existir y los mensajes deben estar codificados en un formato JSON válido.
  • El clúster de MongoDB debe existir y debe ser accesible desde las máquinas de trabajador de Dataflow.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Nombre de la suscripción a Pub/Sub. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
mongoDBUri Una lista separada por comas de los servidores de MongoDB. Por ejemplo: 192.285.234.12:27017,192.287.123.11:27017.
database La base de datos en MongoDB en la que se debe almacenar la colección. Por ejemplo: my-db.
collection Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection.
deadletterTable La tabla de BigQuery que almacena mensajes debido a fallas (esquemas no coincidentes, JSON con formato incorrecto, etcétera). Por ejemplo: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
batchSize El tamaño de lote que se usa para la inserción por lotes de documentos en MongoDB (opcional). Valor predeterminado: 1000.
batchSizeBytes El tamaño del lote en bytes (opcional). Valor predeterminado: 5242880.
maxConnectionIdleTime El tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión (opcional). Valor predeterminado: 60000.
sslEnabled El valor booleano que indica si la conexión a MongoDB está habilitada con SSL (opcional). Valor predeterminado: true.
ignoreSSLCertificate El valor booleano que indica si se debe ignorar el certificado SSL (opcional). Valor predeterminado: true.
withOrdered El valor booleano que habilita las inserciones masivas ordenadas en MongoDB (opcional). Valor predeterminado: true.
withSSLInvalidHostNameAllowed El valor booleano que indica si se permite un nombre de host no válido para la conexión SSL (opcional). Valor predeterminado: true.

Ejecuta la plantilla de Pub/Sub a MongoDB

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to MongoDB template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).

Pub/Sub a Elasticsearch

La plantilla de Pub/Sub a Elasticsearch es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub, ejecuta una función definida por el usuario (UDF) y los escribe en Elasticsearch como documentos. La plantilla de Dataflow usa la función de flujos de datos de Elasticsearch para almacenar datos de series temporales en varios índices y, al mismo tiempo, entregarte un solo recurso con nombre para solicitudes. Los flujos de datos son adecuados para los registros, las métricas, los seguimientos y otros datos generados de forma continua que se almacenan en Pub/Sub.

Requisitos para esta canalización

  • La suscripción a Pub/Sub debe existir, y los mensajes deben estar codificados en un formato JSON válido.
  • Un host de Elasticsearch accesible de forma pública en una instancia de GCP o en Elastic Cloud con Elasticsearch versión 7.0 o posterior. Consulta Integración de Google Cloud para Elastic si deseas obtener más detalles.
  • Un tema de Pub/Sub para resultados de error.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription La suscripción de Cloud Pub/Sub desde la que se realiza el consumo. El nombre debe tener el formato projects/<project-id>/subscriptions/<subscription-name>.
connectionUrl URL de Elasticsearch en el formato https://hostname:[port] o especificar el CloudID si usas Elastic Cloud.
apiKey Clave de API codificada en Base64 que se usa para la autenticación.
errorOutputTopic Tema de salida de Pub/Sub para publicar registros con errores en el formato projects/<project-id>/topics/<topic-name>
dataset El tipo de registros enviados a través de Pub/Sub para los que tenemos un panel listo para usar (opcional). Los valores de tipos de registros conocidos son audit, vpcflow y firewall. Valor predeterminado: pubsub.
namespace Una agrupación arbitraria, como un entorno (dev, prod, or qa), un equipo o una unidad de negocios estratégica (opcional). Valor predeterminado: default.
batchSize El tamaño del lote en cantidad de documentos (opcional). Valor predeterminado: 1000.
batchSizeBytes El tamaño del lote en cantidad de bytes (opcional). Valor predeterminado: 5242880 (5 MB).
maxRetryAttempts La cantidad máxima de reintentos debe ser mayor que 0 (opcional). Valor predeterminado: no retries.
maxRetryDuration La duración máxima del reintento en milisegundos debe ser superior a 0 (opcional). Valor predeterminado: no retries.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
propertyAsIndex Opcional: Una propiedad en el documento que se indexa, cuyo valor especificará los metadatos _index para incluir con el documento en la solicitud masiva (tiene prioridad sobre una UDF _index). Valor predeterminado: none.
propertyAsId Opcional: Una propiedad en el documento que se indexa, cuyo valor especificará los metadatos _id para incluir con el documento en la solicitud masiva (tiene prioridad sobre una UDF _id). Valor predeterminado: none.
javaScriptIndexFnGcsPath La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _index a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none.
javaScriptIndexFnName Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _index que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none.
javaScriptIdFnGcsPath La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _id a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none.
javaScriptIdFnName Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _id que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none.
javaScriptTypeFnGcsPath La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _type a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none.
javaScriptTypeFnName Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _type que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none.
javaScriptIsDeleteFnGcsPath La ruta de acceso de Cloud Storage a la fuente de UDF de JavaScript para la función que determinará si el documento se debe borrar en lugar de insertar o actualizar (opcional). La función debe mostrar el valor de string "true" o "false". Valor predeterminado: none.
javaScriptIsDeleteFnName Opcional: El nombre de función de JavaScript de la UDF para la función que determinará si el documento se debe borrar en lugar de insertarse o actualizarse. La función debe mostrar el valor de string "true" o "false". Valor predeterminado: none.
usePartialUpdate Opcional: Indica si se deben usar actualizaciones parciales (actualizar en lugar de crear o indexar), que permite documentos parciales con solicitudes de Elasticsearch. Valor predeterminado: false.
bulkInsertMethod Opcional: Si se debe usar INDEX (índice, permite inserción) o CREATE (creación, errores en _id duplicado) con solicitudes masivas de Elasticsearch. Valor predeterminado: CREATE.

Ejecuta la plantilla de Pub/Sub a Elasticsearch

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Elasticsearch template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • ERROR_OUTPUT_TOPIC: Es tu tema de Pub/Sub para resultados de error
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • CONNECTION_URL: Es tu URL de Elasticsearch
  • DATASET: Es tu tipo de registro
  • NAMESPACE: Es el espacio de nombres para el conjunto de datos
  • APIKEY: Es tu clave de API codificada en Base64 para la autenticación

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • ERROR_OUTPUT_TOPIC: Es tu tema de Pub/Sub para resultados de error
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • CONNECTION_URL: Es tu URL de Elasticsearch
  • DATASET: Es tu tipo de registro
  • NAMESPACE: Es el espacio de nombres para el conjunto de datos
  • APIKEY: Es tu clave de API codificada en Base64 para la autenticación

Datastream a Cloud Spanner

La plantilla de Datastream para Cloud Spanner es una canalización de transmisión que lee eventos de Datastream desde un bucket de Cloud Storage y los escribe en una base de datos de Cloud Spanner. Está diseñado para la migración de datos de fuentes de Datastream a Cloud Spanner.

Todas las tablas necesarias para la migración deben existir en la base de datos de destino de Cloud Spanner antes de la ejecución de la plantilla. Por lo tanto, la migración del esquema de una base de datos de origen a Cloud Spanner de destino debe completarse antes de la migración de datos. Los datos pueden existir en las tablas antes de la migración. Esta plantilla no propaga los cambios de esquema de Datastream a la base de datos de Cloud Spanner.

La coherencia de los datos solo está garantizada al final de la migración cuando todos los datos se escribieron en Cloud Spanner. A fin de almacenar información sobre el orden para cada registro escrito en Cloud Spanner, esta plantilla crea una tabla adicional (llamada tabla paralela) para cada tabla en la base de datos de Cloud Spanner. Esto se usa para garantizar la coherencia al final de la migración. Las tablas paralelas no se borran después de la migración y se pueden usar con fines de validación al final de la migración.

Cualquier error que ocurra durante la operación, como discrepancias de esquema, archivos JSON con formato incorrecto o errores resultantes de la ejecución de transformaciones, se registra en una cola de errores. La cola de errores es una carpeta de Cloud Storage que almacena todos los eventos de Datastream que encontraron errores junto con el motivo del error en formato de texto. Los errores pueden ser transitorios o permanentes, y se almacenan en las carpetas de Cloud Storage adecuadas en la cola de errores. Los errores transitorios se reintentan automáticamente, mientras que los errores permanentes no. En el caso de errores permanentes, tienes la opción de corregir los eventos de cambio y moverlos al bucket que se puede reintentar mientras se ejecuta la plantilla.

Requisitos para esta canalización:

  • Una transmisión de Datastream en estado En ejecución o No iniciado
  • Un bucket de Cloud Storage en el que se replican los eventos de Datastream.
  • Una base de datos de Cloud Spanner con tablas existentes. Estas tablas pueden estar vacías o contener datos.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern La ubicación de los archivos de Datastream en Cloud Storage para replicar. Por lo general, esta es la ruta de acceso raíz de una transmisión.
streamName El nombre o la plantilla del flujo que se consultará para obtener la información del esquema y el tipo de fuente.
instanceId La instancia de Cloud Spanner en la que se replican los cambios.
databaseId La base de datos de Cloud Spanner en la que se replican los cambios.
projectId El ID del proyecto de Cloud Spanner.
deadLetterQueueDirectory Esta es ruta de acceso del archivo para almacenar el resultado de la cola de errores (opcional). El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow.
inputFileFormat El formato del archivo de salida que produce Datastream (opcional). Por ejemplo: avro,json. Valor predeterminado, avro.
shadowTablePrefix El prefijo que se usa para nombrar las tablas de paralelas (opcional). Valor predeterminado: shadow_.

Ejecuta la plantilla de Datastream para Cloud Spanner

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Datastream to Spanner template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: es la instancia de Cloud Spanner.
  • CLOUDSPANNER_DATABASE: es la base de datos de Cloud Spanner.
  • DLQ: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: es la instancia de Cloud Spanner.
  • CLOUDSPANNER_DATABASE: es la base de datos de Cloud Spanner.
  • DLQ: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.

Archivos de texto en Cloud Storage a BigQuery (transmisión)

La canalización de archivos de texto en Cloud Storage a BigQuery es una canalización de transmisión que te permite transmitir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones y adjuntar el resultado en BigQuery.

La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual a través de una cancelación y no una desviación, debido a su uso de la transformación Watch, que es una DoFn divisible que no admite el desvío.

Requisitos para esta canalización:

  • Crea un archivo JSON que describa el esquema de tu tabla de salida en BigQuery.

    Asegúrate de que haya un array JSON de nivel superior titulado fields y que su contenido siga el patrón {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Por ejemplo:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Crea un archivo JavaScript (.js) con tu función UDF que proporciona la lógica para transformar las líneas de texto. Ten en cuenta que tu función debe mostrar una string JSON.

    Por ejemplo, esta función divide cada línea de un archivo CSV y muestra una string JSON después de transformar los valores.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Parámetros de la plantilla

Parámetro Descripción
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
JSONPath Ubicación de Cloud Storage de tu archivo de esquema de BigQuery, descrito como un JSON. Por ejemplo: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
outputTable La tabla de BigQuery calificada por completo. Por ejemplo: my-project:dataset.table
inputFilePattern Ubicación en Cloud Storage del texto que quieres procesar. Por ejemplo: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Directorio temporal para el proceso de carga de BigQuery. Por ejemplo: gs://my-bucket/my-files/temp_dir.
outputDeadletterTable Tabla de mensajes que no llegaron a la tabla de resultados. Por ejemplo: my-project:dataset.my-unprocessed-table. Si no existe, se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla de Cloud Storage Text a BigQuery (transmisión)

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery para los mensajes no procesados.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.

Archivos de texto en Cloud Storage a Pub/Sub (transmisión)

Esta plantilla crea una canalización de transmisión que sondea de forma continua los archivos de texto nuevos subidos a Cloud Storage, lee cada archivo línea por línea y publica strings en un tema de Pub/Sub. La plantilla publica registros en un archivo delimitado por saltos de línea que contiene registros JSON o archivos CSV en un tema de Pub/Sub para su procesamiento en tiempo real. Puedes usar esta plantilla para reproducir datos en Pub/Sub.

La canalización se ejecuta de forma indefinida, y se debe finalizar de forma manual mediante una “cancelación” y no un “drain”, debido a su uso de la transformación “Watch”, que es un “SplittableDoFn” que no admite el desvío.

Actualmente, el intervalo de sondeo es fijo y configurado en 10 segundos. Esta plantilla no establece una marca de tiempo en los registros individuales. Es por eso que la hora del evento es la misma que la hora de publicación durante la ejecución. Si tu canalización depende de la hora precisa del evento para el procesamiento, no deberías usar esta canalización.

Requisitos para esta canalización:

  • Los archivos de entrada deben tener el formato JSON delimitado por saltos de línea o CSV. Los registros que abarcan varias líneas en los archivos de origen pueden causar problemas de bajada, ya que cada línea dentro de los archivos se publica como un mensaje en Pub/Sub.
  • El tema de Pub/Sub debe existir antes de la ejecución.
  • La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/files/*.json o gs://bucket-name/path/*.csv.
outputTopic El tema de entrada de Pub/Sub en el que se desea escribir. El nombre debe tener el formato projects/<project-id>/topics/<topic-name>.

Ejecuta la plantilla de archivos de texto en Cloud Storage a Pub/Sub (transmisión)

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • FILE_PATTERN: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo, path/*.csv).

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • FILE_PATTERN: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo, path/*.csv).

Enmascaramiento de datos y asignación de token de Cloud Storage a BigQuery (con DLP de Cloud)

La plantilla de enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP) es una canalización de transmisión que lee archivos csv de un bucket de Cloud Storage, llama a la API de Cloud Data Loss Prevention (Cloud DLP) para la desidentificación y escribe los datos desidentificados en la tabla de BigQuery especificada. Esta plantilla admite el uso de una plantilla de inspección de Cloud DLP y una plantilla de desidentificación de Cloud DLP. Esto permite que los usuarios inspeccionen información que puede ser sensible y desidentificar datos estructurados en los que las columnas están especificadas para ser desidentificadas y no se necesita la inspección. También es importante tener en cuenta que esta plantilla no admite una ruta de acceso regional para la ubicación de la plantilla de desidentificación. Solo se admite una ruta global.

Requisitos para esta canalización:

  • Los datos de entrada para la asignación de tokens deben existir
  • Las plantillas de Cloud DLP deben existir (por ejemplo, InspectTemplate y DeidentifyTemplate). Consulta Plantillas de Cloud DLP para obtener más detalles.
  • El conjunto de datos de BigQuery debe existir

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern Los archivos csv desde los que se leen los registros de datos de entrada. También se acepta el comodín. Por ejemplo, gs://mybucket/my_csv_filename.csv o gs://mybucket/file-*.csv.
dlpProjectId ID del proyecto de Cloud DLP que posee el recurso de la API de Cloud DLP. Este proyecto de Cloud DLP puede ser el mismo que posee las plantillas de Cloud DLP, o puede ser uno independiente. Por ejemplo, my_dlp_api_project.
deidentifyTemplateName Plantilla de desidentificación de Cloud DLP que se usa para las solicitudes a la API, especificada con el patrón projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId}. Por ejemplo, projects/my_project/deidentifyTemplates/100.
datasetName Conjunto de datos de BigQuery para enviar resultados con asignación de token.
batchSize Tamaño de fragmentación o del lote para enviar datos a fin de inspeccionar o quitar la asignación de token. En el caso de un archivo csv, batchSize es la cantidad de filas en un lote. Los usuarios deben determinar el tamaño del lote según el tamaño de los registros y del archivo. Ten en cuenta que la API de Cloud DLP tiene un límite de tamaño de carga útil de 524 KB por llamada a la API.
inspectTemplateName Plantilla de inspección de Cloud DLP que se usará para solicitudes a la API, especificada con el patrón projects/{template_project_id}/identifyTemplates/{idTemplateId} (opcional). Por ejemplo, projects/my_project/identifyTemplates/100

Ejecuta la plantilla de enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP)

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

Reemplaza lo siguiente:

  • DLP_API_PROJECT_ID: Es el ID del proyecto de la API de Cloud DLP.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_DATA: Es la ruta de acceso del archivo de entrada.
  • DEIDENTIFY_TEMPLATE: Es el número de plantilla de Cloud DLPDeidentify.
  • DATASET_NAME: Es el nombre del conjunto de datos de BigQuery.
  • INSPECT_TEMPLATE_NUMBER: Es el número de plantilla de Cloud DLPInspect.
  • BATCH_SIZE_VALUE: Es el tamaño del lote (número de filas por API para csv).

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • DLP_API_PROJECT_ID: Es el ID del proyecto de la API de Cloud DLP.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TEMP_LOCATION: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo, gs://your-bucket/temp).
  • INPUT_DATA: Es la ruta de acceso del archivo de entrada.
  • DEIDENTIFY_TEMPLATE: Es el número de plantilla de Cloud DLPDeidentify.
  • DATASET_NAME: Es el nombre del conjunto de datos de BigQuery.
  • INSPECT_TEMPLATE_NUMBER: Es el número de plantilla de Cloud DLPInspect.
  • BATCH_SIZE_VALUE: Es el tamaño del lote (número de filas por API para csv).

Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub (transmisión)

La plantilla Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub es una canalización de transmisión que lee mensajes de Pub/Sub con datos de modificación desde una base de datos de MySQL y escribe los registros en BigQuery. Un conector Debezium captura las modificaciones en la base de datos de MySQL y publica los datos modificados en Pub/Sub. Luego, la plantilla lee los mensajes de Pub/Sub y los escribe en BigQuery.

Puedes usar esta plantilla para sincronizar las bases de datos de MySQL y las tablas de BigQuery. La canalización escribe los datos modificados en una tabla de etapa de pruebas de BigQuery y actualiza de forma intermitente una tabla de BigQuery que replica la base de datos de MySQL.

Requisitos para esta canalización:

Parámetros de la plantilla

Parámetro Descripción
inputSubscriptions La lista separada por comas de las suscripciones de entrada de Pub/Sub desde la que se va a leer, en el formato <subscription>,<subscription>, ...
changeLogDataset El conjunto de datos de BigQuery para almacenar las tablas de etapa de pruebas, en el formato <my-dataset>
replicaDataset La ubicación del conjunto de datos de BigQuery para almacenar las tablas de réplica, en el formato <my-dataset>
updateFrequencySecs (Opcional) el intervalo en el que la canalización actualiza la tabla de BigQuery y replica la base de datos de MySQL.

Ejecuta la plantilla de Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub

Para ejecutar esta plantilla, sigue estos pasos:

  1. En tu máquina local, clona el repositorio DataflowTemplates.
  2. Cambia al directorio v2/cdc-parent.
  3. Asegúrate de que el conector Debezium esté implementado.
  4. Mediante Maven, ejecuta la plantilla de Dataflow:
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • SUBSCRIPTIONS: la lista separada por comas de los nombres de suscripción a Pub/Sub
    • CHANGELOG_DATASET: el conjunto de datos de BigQuery para los datos de registro de cambios
    • REPLICA_DATASET: el conjunto de datos de BigQuery para tablas de réplica

Apache Kafka a BigQuery

La plantilla de Apache Kafka a BigQuery es una canalización de transmisión que transfiere datos de texto de Apache Kafka, ejecuta una función definida por el usuario (UDF) y envía los registros resultantes a BigQuery. Cualquier error que ocurra en la transformación de los datos, la ejecución de la UDF o la inserción en la tabla de resultados se inserta en una tabla de errores independiente en BigQuery. Si la tabla de errores no existe antes de la ejecución, se creará.

Requisitos para esta canalización

  • La tabla de salida de BigQuery debe existir.
  • El servidor del agente de Apache Kafka debe estar en ejecución y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.
  • Los temas de Apache Kafka deben existir, y los mensajes deben estar codificados en un formato JSON válido.

Parámetros de la plantilla

Parámetro Descripción
outputTableSpec La ubicación de la tabla de salida de BigQuery en la que se deben escribir los mensajes de Apache Kafka, en el formato my-project:dataset.table.
inputTopics Los temas de entrada de Apache Kafka desde los que se debe leer en una lista separada por comas. Por ejemplo: messages.
bootstrapServers La dirección del host de los servidores del agente de Apache Kafka en ejecución en una lista separada por comas, cada dirección de host tiene el formato 35.70.252.199:9092.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
outputDeadletterTable (Opcional) la tabla de BigQuery para los mensajes que no llegaron a la tabla de salida, en el formato my-project:dataset.my-deadletter-table. Si no existe, la tabla se crea durante la ejecución de la canalización. Si no se especifica, se usa <outputTableSpec>_error_records en su lugar.

Ejecuta la plantilla de Apache Kafka a BigQuery

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Kafka to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.

Para obtener más información, consulta Escribe datos de Kafka en BigQuery con Dataflow.

Datastream a BigQuery (transmisión)

La plantilla de Datastream a BigQuery es una canalización de transmisión que lee datos de Datastream y los replica en BigQuery. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en una tabla de etapa de pruebas de BigQuery particionada por tiempo. Después de la replicación, la plantilla ejecuta una MERGE en BigQuery para actualizar todos los cambios de captura de datos modificados (CDC) en una réplica de la tabla de origen.

La plantilla controla la creación y la actualización de las tablas de BigQuery que administra la replicación. Cuando se requiere un lenguaje de definición de datos (DDL), una devolución de llamada a Datastream extrae el esquema de la tabla de origen y lo traduce a los tipos de datos de BigQuery. Las operaciones admitidas incluyen las siguientes:

  • Las tablas nuevas se crean a medida que se insertan los datos.
  • Se agregan columnas nuevas a las tablas de BigQuery con valores iniciales nulos.
  • Las columnas descartadas se ignoran en BigQuery y los valores futuros son nulos.
  • Las columnas cuyos nombres se han cambiado se agregan a BigQuery como columnas nuevas.
  • Los cambios de tipo no se propagan a BigQuery.

Requisitos para esta canalización:

  • Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
  • Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
  • Se crean los conjuntos de datos de destino de BigQuery y se les otorgó acceso de administrador a la cuenta de servicio de Compute Engine.
  • En la tabla de origen, se necesita una clave primaria para crear la réplica de destino.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern La ubicación de los archivos de Datastream en Cloud Storage para replicar. Esta ubicación suele ser la ruta de acceso raíz de la transmisión.
gcsPubSubSubscription La suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat El formato del archivo de salida que produce Datastream. Por ejemplo: avro,json. Valor predeterminado, avro.
outputStagingDatasetTemplate El nombre de un conjunto de datos existente para contener tablas de etapa de pruebas. Puedes incluir la plantilla {_metadata_dataset} como marcador de posición que se reemplazará por el nombre del conjunto de datos o esquema de origen (p. ej., {_metadata_dataset}_log).
outputDatasetTemplate El nombre de un conjunto de datos existente que contiene tablas de réplica. Puedes incluir la plantilla {_metadata_dataset} como marcador de posición que se reemplazará por el nombre del conjunto de datos o esquema de origen (p. ej., {_metadata_dataset}).
deadLetterQueueDirectory La ruta de acceso del archivo para almacenar los mensajes no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
outputStagingTableNameTemplate La plantilla para el nombre de las tablas de etapa de pruebas (opcional). El valor predeterminado es {_metadata_table}_log. Si replicas varios esquemas, la sugerencia es {_metadata_schema}_{_metadata_table}_log.
outputTableNameTemplate La plantilla para el nombre de las tablas de réplica (opcional). Valor predeterminado, {_metadata_table}. Si replicas varios esquemas, la sugerencia es {_metadata_schema}_{_metadata_table}.
outputProjectId Proyecto para los conjuntos de datos de BigQuery en los que se deben generar datos (opcional). El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
streamName El nombre o la plantilla del flujo que se consultará para obtener la información del esquema (opcional). Valor predeterminado, {_metadata_stream}.
mergeFrequencyMinutes La cantidad de minutos entre combinaciones para una tabla determinada (opcional). Valor predeterminado, 5.
dlqRetryMinutes La cantidad de minutos entre reintentos de la cola de mensajes no entregados (DLQ) (opcional). Valor predeterminado, 10.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

Ejecuta la plantilla de Datastream a BigQuery

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Datastream to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
  • BIGQUERY_DATASET: es el nombre de tu conjunto de datos de BigQuery.
  • BIGQUERY_TABLE: es la plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
  • BIGQUERY_DATASET: es el nombre de tu conjunto de datos de BigQuery.
  • BIGQUERY_TABLE: es la plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log

Transmisión de datos a MySQL o PostgreSQL (Transmisión)

La plantilla de transmisión de datos a SQL es una canalización de transmisión que lee datos de Datastream y los replica en cualquier base de datos de MySQL o PostgreSQL. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en tablas de réplica de PostgreSQL.

La plantilla no es compatible con el lenguaje de definición de datos (DDL) y espera que todas las tablas ya existan en la base de datos. La replicación usa transformaciones con estado de Dataflow para filtrar los datos inactivos y garantizar la coherencia dentro de los datos desordenados. Por ejemplo, si ya se pasó una versión más reciente de una fila, se ignorará una versión tardía de esa fila. El lenguaje de manipulación de datos (DML) que se ejecuta es el mejor intento de replicar perfectamente los datos de origen o destino. Las declaraciones DML ejecutadas siguen las siguientes reglas:

  • Si existe una clave primaria, las operaciones de inserción y actualización usan una sintaxis de upsert (es decir, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Si las claves primarias existen, las eliminaciones se replican como un DML borrado.
  • Si no existe una clave primaria, se insertan las operaciones de inserción y actualización en la tabla.
  • Si no existen claves primarias, se ignoran las eliminaciones.

Si usas las utilidades de Oracle para Postgres, agrega ROWID en SQL como la clave primaria cuando no exista ninguna.

Los requisitos de esta canalización son los siguientes:

  • Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
  • Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
  • Se propagó una base de datos de PostgreSQL con el esquema requerido.
  • Se configura el acceso a la red entre los trabajadores de Dataflow y PostgreSQL.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern La ubicación de los archivos de Datastream en Cloud Storage para replicar. Esta ubicación suele ser la ruta de acceso raíz de la transmisión.
gcsPubSubSubscription La suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat El formato del archivo de salida que produce Datastream. Por ejemplo: avro,json. Valor predeterminado, avro.
databaseHost El host de SQL para conectarte.
databaseUser El usuario de SQL con todos los permisos necesarios para escribir en todas las tablas en la replicación.
databasePassword La contraseña para el usuario de SQL especificado.
databasePort (Opcional) El puerto de la base de datos de SQL al que se realizará la conexión. Valor predeterminado, 5432.
databaseName (Opcional) El nombre de la base de datos de SQL a la que se realizará la conexión. Valor predeterminado, postgres.
streamName El nombre o la plantilla del flujo que se consultará para obtener la información del esquema (opcional). Valor predeterminado, {_metadata_stream}.

Ejecuta la plantilla de transmisión de datos a SQL

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Datastream to SQL template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: Es la IP del host de SQL.
  • DATABASE_USER: Es tu usuario de SQL.
  • DATABASE_PASSWORD: Es tu contraseña de SQL.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: Es la IP del host de SQL.
  • DATABASE_USER: Es tu usuario de SQL.
  • DATABASE_PASSWORD: Es tu contraseña de SQL.

Conectividad a bases de datos de Java (JDBC) de Pub/Sub

La plantilla de conectividad de base de datos de Pub/Sub a Java (JDBC) es una canalización de transmisión que transfiere datos desde una suscripción de Cloud Pub/Sub preexistente como strings JSON y escribe los registros resultantes en JDBC.

Requisitos para esta canalización:

  • La suscripción de Cloud Pub/Sub de origen debe existir antes de ejecutar la canalización.
  • La fuente de JDBC debe existir antes de ejecutar la canalización.
  • El tema no entregado de Cloud Pub/Sub debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetro Descripción
driverClassName El nombre de la clase del controlador de JDBC. Por ejemplo, com.mysql.jdbc.Driver
connectionUrl La string de la URL de la conexión de JDBC. Por ejemplo, jdbc:mysql://some-host:3306/sampledb Se puede pasar como una string codificada en Base64 y, luego, encriptada con una clave de Cloud KMS.
driverJars Rutas de Cloud Storage separadas por comas para controladores de JDBC. Por ejemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
username El nombre de usuario para usar en la conexión de JDBC (opcional). Se puede pasar como una string codificada en Base64 encriptada con una clave de Cloud KMS.
password La contraseña para usar en la conexión de JDBC (opcional). Se puede pasar como una string codificada en Base64 encriptada con una clave de Cloud KMS.
connectionProperties La string de propiedades para usar en la conexión de JDBC (opcional). El formato de la string debe ser [propertyName=property;]*. Por ejemplo, unicode=true;characterEncoding=UTF-8
statement Declaración que se ejecutará en la base de datos. La instrucción debe especificar los nombres de columna de la tabla en cualquier orden. Solo los valores de los nombres de columna especificados se leen del JSON y se agregan a la instrucción. Por ejemplo, INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey La clave de encriptación de Cloud KMS para desencriptar el nombre de usuario, la contraseña y la string de conexión (opcional). Si se pasa la clave de Cloud KMS, el nombre de usuario, la contraseña y la string de conexión deben pasarse encriptados.
extraFilesToStage Rutas de Cloud Storage separadas por comas o secretos de Secret Manager para los archivos que se deben almacenar en etapa intermedia en el trabajador. Estos archivos se guardarán en el directorio /extra_files de cada trabajador. Por ejemplo, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

Ejecuta la plantilla de Pub/Sub a la base de datos de Java (JDBC)

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to JDBC template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • DRIVER_CLASS_NAME: Es el nombre de la clase del controlador.
  • JDBC_CONNECTION_URL: Es la URL de conexión de JDBC.
  • DRIVER_PATHS: Son las rutas de Cloud Storage separadas por comas de los controladores JDBC.
  • CONNECTION_USERNAME: Es el nombre de usuario de la conexión de JDBC.
  • CONNECTION_PASSWORD: Es la contraseña de la conexión de JDBC.
  • CONNECTION_PROPERTIES: Las propiedades de conexión de JDBC, si es necesario
  • SQL_STATEMENT: Es la instrucción de SQL que se ejecutará en la base de datos.
  • INPUT_SUBSCRIPTION: Es la suscripción de entrada de Pub/Sub desde la que se desea leer.
  • OUTPUT_DEADLETTER_TOPIC: Es el tema de Pub/Sub para reenviar mensajes que no se pueden entregar.
  • KMS_ENCRYPTION_KEY: Es la clave de encriptación de Cloud KMS.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • DRIVER_CLASS_NAME: Es el nombre de la clase del controlador.
  • JDBC_CONNECTION_URL: Es la URL de conexión de JDBC.
  • DRIVER_PATHS: Son las rutas de Cloud Storage separadas por comas de los controladores JDBC.
  • CONNECTION_USERNAME: Es el nombre de usuario de la conexión de JDBC.
  • CONNECTION_PASSWORD: Es la contraseña de la conexión de JDBC.
  • CONNECTION_PROPERTIES: Las propiedades de conexión de JDBC, si es necesario
  • SQL_STATEMENT: Es la instrucción de SQL que se ejecutará en la base de datos.
  • INPUT_SUBSCRIPTION: Es la suscripción de entrada de Pub/Sub desde la que se desea leer.
  • OUTPUT_DEADLETTER_TOPIC: Es el tema de Pub/Sub para reenviar mensajes que no se pueden entregar.
  • KMS_ENCRYPTION_KEY: Es la clave de encriptación de Cloud KMS.

Flujos de cambio de Cloud Spanner a Cloud Storage

La plantilla de flujos de cambios de Cloud Spanner a Cloud Storage es una canalización de transmisión que transmite los registros de cambios de datos de Spanner y los escribe en un bucket de Cloud Storage mediante Dataflow Runner v2.

La canalización agrupa los registros de flujos de cambios de Spanner en períodos según su marca de tiempo, y cada período representa un intervalo de tiempo cuya duración puedes configurar con esta plantilla. Se garantiza que todos los registros con marcas de tiempo que pertenecen al período se encuentran en el período; no puede haber retrasos. También puedes definir una cantidad de fragmentos de salida. La canalización crea un archivo de salida de Cloud Storage por período por fragmento. Dentro de un archivo de salida, los registros no están ordenados. Se pueden escribir los archivos de salida en formato JSON o AVRO, según la configuración del usuario.

Ten en cuenta que puedes minimizar la latencia de la red y los costos de transporte de la red si ejecutas el trabajo de Dataflow desde la misma región que tu instancia de Cloud Spanner o tu bucket de Cloud Storage. Si usas fuentes, receptores, ubicaciones de archivos de etapa de pruebas o ubicaciones de archivos temporales que se encuentran fuera de la región del trabajo, es posible que los datos se envíen a través de diferentes regiones. Obtén más información sobre los extremos regionales de Dataflow.

Obtén más información sobre los flujos de cambios, cómo compilar canalizaciones de Dataflow de flujos de cambio y prácticas recomendadas.

Requisitos para esta canalización:

  • La instancia de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La instancia de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • El flujo de cambios de Cloud Spanner debe existir antes de ejecutar la canalización.
  • El bucket de salida de Cloud Storage debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetro Descripción
spannerInstanceId El ID de instancia de Cloud Spanner desde el que se leerán los datos de flujos de cambios.
spannerDatabase La base de datos de Cloud Spanner desde la que se leerán los datos de flujos de cambios.
spannerMetadataInstanceId El ID de instancia de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
spannerMetadataDatabase La base de datos de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
spannerChangeStreamName El nombre del flujo de cambios de Cloud Spanner desde el que se leerá.
gcsOutputDirectory La ubicación del archivo de salida de los flujos de cambios en Cloud Storage en el siguiente formato: '“s://${BUCKET}/${ROOT_PATH}/”.
outputFilenamePrefix (Opcional). El prefijo de nombre de archivo de los archivos en los que se escribirá. El prefijo de archivo predeterminado está configurado como “output”.
spannerProjectId (Opcional). Proyecto desde el que se leerán los flujos de cambio. Este es también el proyecto en el que se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
startTimestamp (Opcional). La fecha y hora de inicio, inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo del inicio de la canalización, es decir, la hora actual.
endTimestamp (Opcional). La fecha y hora de finalización, inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro.
outputFileFormat (Opcional). El formato del archivo de salida de Cloud Storage. Los formatos permitidos son TEXT y AVRO. El valor predeterminado es AVRO.
windowDuration La duración de la ventana es el intervalo en el que se escriben los datos en el directorio de salida (opcional). Configura la duración en función de la capacidad de procesamiento de la canalización. Por ejemplo, una capacidad de procesamiento mayor puede requerir tamaños de ventana más pequeños para que los datos se ajusten a la memoria. La configuración predeterminada es de 5 min, con un mínimo de 1 s. Los formatos permitidos son: [nro. entero] s (para los segundos, por ejemplo, 5 s), [nro. entero] min (para los minutos, por ejemplo, 12 min) y [nro. entero] h (para las horas, por ejemplo, 2 h).
rpcPriority (Opcional). La prioridad de solicitud para llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH, MEDIUM, LOW]. (Predeterminado: HIGH)
numShards (Opcional). La cantidad máxima de fragmentos de salida que se produce con la escritura. La cantidad predeterminada es 20. Una mayor cantidad de fragmentos implica una mayor capacidad de procesamiento para la escritura en Cloud Storage, pero, también, un mayor costo de agregación de datos entre fragmentos cuando se procesan archivos de salida de Cloud Storage.
spannerMetadataTableName (Opcional). El nombre de la tabla de metadatos del conector de flujos de cambios de Cloud Spanner que se usará. Si no se proporciona, se creará una tabla de metadatos de flujos de cambios de Cloud Spanner automáticamente durante el flujo de canalización. Este parámetro se debe proporcionar cuando se actualiza una canalización existente y no se debe proporcionar de otra manera.

Ejecuta la plantilla de flujos de cambios de Cloud Spanner a Cloud Storage

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to Google Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Cloud Spanner
  • SPANNER_DATABASE: Base de datos de Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Cloud Spanner
  • SPANNER_METADATA_DATABASE: Base de datos de metadatos de Cloud Spanner
  • SPANNER_CHANGE_STREAM: Flujo de cambios de Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: Ubicación del archivo de salida de los flujos de cambios

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Cloud Spanner
  • SPANNER_DATABASE: Base de datos de Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Cloud Spanner
  • SPANNER_METADATA_DATABASE: Base de datos de metadatos de Cloud Spanner
  • SPANNER_CHANGE_STREAM: Flujo de cambios de Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: Ubicación del archivo de salida de los flujos de cambios

Flujos de cambio de Cloud Spanner a BigQuery

La plantilla de flujos de cambios de Cloud Spanner a BigQuery es una canalización de transmisión que transmite los registros de cambios de datos de Cloud Spanner y los escribe en tablas de BigQuery mediante Dataflow Runner v2.

Si las tablas de BigQuery necesarias no existen, la canalización las crea. De lo contrario, se usan las tablas de BigQuery existentes. El esquema de las tablas de BigQuery existentes debe contener las columnas con seguimiento correspondientes de las tablas de Cloud Spanner y las columnas de metadatos adicionales (consulta la descripción de los campos de metadatos en la siguiente lista) que la opción “ignoreFields” no ignora de manera explícita. Cada fila nueva de BigQuery incluye todas las columnas observadas por el flujo de cambios de su fila correspondiente en tu tabla de Cloud Spanner en la marca de tiempo del registro de cambios.

Todas las columnas observadas por el flujo de cambios se incluyen en cada fila de la tabla de BigQuery, sin importar si una transacción de Cloud Spanner las modifica. Las columnas que no se observan no se incluyen en la fila de BigQuery. Cualquier cambio de Cloud Spanner que sea menor que la marca de agua de Dataflow se aplica de forma correcta a las tablas de BigQuery o se almacena en la cola de mensajes no entregados para reintentar aplicarlo. Las filas de BigQuery se insertan de forma desordenada en comparación con el orden original de las marcas de tiempo de confirmación de Cloud Spanner.

Los siguientes campos de metadatos se agregan a las tablas de BigQuery:

  • _metadata_spanner_mod_type: Se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_table_name: Es el nombre de la tabla de Cloud Spanner. Ten en cuenta que este no es el nombre de la tabla de metadatos del conector.
  • _metadata_spanner_commit_timestamp: Se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_server_transaction_id: Se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_record_sequence: Se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: Se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_number_of_records_in_transaction: Se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_number_of_partitions_in_transaction: Se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_big_query_commit_timestamp: La marca de tiempo de confirmación del momento en que se inserta la fila en BigQuery.

Nota:

  • Esta plantilla no propaga los cambios de esquema de Cloud Spanner a BigQuery. Debido a que realizar un cambio de esquema en Cloud Spanner probablemente dañe la canalización, es posible que debas volver a crear la canalización después del cambio de esquema.
  • Para los tipos de captura de valor OLD_AND_NEW_VALUES y NEW_VALUES, cuando el registro de cambios de datos contiene un cambio UPDATE, la plantilla debe realizar una lectura inactiva en Cloud Spanner en la marca de tiempo de confirmación del registro de cambios de datos a fin de recuperar las columnas que sí se observaron, pero no se modificaron. Asegúrate de configurar la base de datos “'version_retention_period” de forma adecuada para la lectura inactiva. Para el tipo de captura de valor NEW_ROW, la plantilla es más eficiente, ya que el registro de cambios de datos captura la fila nueva completa, incluidas las columnas que no se actualizan en UPDATE y la plantilla no necesita llevar a cabo una lectura inactiva.
  • Para minimizar la latencia de la red y los costos de transporte de la red, puedes ejecutar el trabajo de Dataflow desde la misma región que tu instancia de Cloud Spanner o tus tablas de BigQuery. Si usas fuentes, receptores, ubicaciones de archivos de etapa de pruebas o ubicaciones de archivos temporales que se encuentran fuera de la región del trabajo, es posible que los datos se envíen a través de diferentes regiones. Obtén más información sobre los extremos regionales de Dataflow.
  • Esta plantilla admite todos los tipos de datos válidos de Cloud Spanner, pero si el tipo de BigQuery es más preciso que el tipo de Cloud Spanner, podría producirse una pérdida precisión durante la transformación. Específicamente:
    • Para el tipo JSON de Cloud Spanner, el orden de los miembros de un objeto se ordena de forma lexicográfica, pero no existe esa garantía para el tipo JSON de BigQuery.
    • Cloud Spanner admite el tipo de TIMESTAMP de nanosegundos, BigQuery solo admite el tipo de TIMESTAMP de microsegundos.

Obtén más información sobre los flujos de cambios, cómo compilar canalizaciones de Dataflow de flujos de cambio y prácticas recomendadas.

Requisitos para esta canalización:

  • La instancia de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La instancia de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • El flujo de cambios de Cloud Spanner debe existir antes de ejecutar la canalización.
  • El conjunto de datos de BigQuery debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetro Descripción
spannerInstanceId La instancia de Cloud Spanner desde la que se leerán los flujos de cambio.
spannerDatabase La base de datos de Cloud Spanner desde la que se leerán los flujos de cambio.
spannerMetadataInstanceId La instancia de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
spannerMetadataDatabase La base de datos de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
spannerChangeStreamName El nombre del flujo de cambios de Cloud Spanner desde el que se leerá.
bigQueryDataSet El conjunto de datos de BigQuery de salida de los flujos de cambios.
spannerProjectId (Opcional). Proyecto desde el que se leerán los flujos de cambio. Este es también el proyecto en el que se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
spannerMetadataTableName (Opcional). El nombre de la tabla de metadatos del conector de flujos de cambios de Cloud Spanner que se usará. Si no se proporciona, una tabla de metadatos de conectores de transmisión de cambios de Cloud Spanner se crea de forma automática durante el flujo de la canalización. Este parámetro se debe proporcionar cuando se actualiza una canalización existente y no se debe proporcionar de otra manera.
rpcPriority (Opcional). La prioridad de solicitud para llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH, MEDIUM, LOW]. (Predeterminado: HIGH)
startTimestamp (Opcional). La fecha y hora de inicio, inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo del inicio de la canalización, es decir, la hora actual.
endTimestamp (Opcional). La fecha y hora de finalización, inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro.
bigQueryProjectId (Opcional). El proyecto de BigQuery. El valor predeterminado es el proyecto para el trabajo de Dataflow.
bigQueryChangelogTableNameTemplate (Opcional). La plantilla para el nombre de las tablas de registros de cambios de BigQuery. El valor predeterminado es {_metadata_spanner_table_name}_changelog.
deadLetterQueueDirectory (Opcional). La ruta de acceso del archivo que almacenará los registros no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
dlqRetryMinutes (Opcional). La cantidad de minutos entre reintentos de la cola de mensajes no entregados. La configuración predeterminada es 10.
ignoreFields (Opcional) Lista de campos separados por comas (con distinción entre mayúsculas y minúsculas) que se deben ignorar. Pueden ser campos de tablas observadas o campos de metadatos que agrega la canalización. Los campos ignorados no se insertarán en BigQuery.

Ejecuta la plantilla de flujos de cambios de Cloud Spanner a BigQuery

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Cloud Spanner
  • SPANNER_DATABASE: Base de datos de Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Cloud Spanner
  • SPANNER_METADATA_DATABASE: Base de datos de metadatos de Cloud Spanner
  • SPANNER_CHANGE_STREAM: Flujo de cambios de Cloud Spanner
  • BIGQUERY_DATASET: El conjunto de datos de BigQuery de salida de los flujos de cambios.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Cloud Spanner
  • SPANNER_DATABASE: Base de datos de Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Cloud Spanner
  • SPANNER_METADATA_DATABASE: Base de datos de metadatos de Cloud Spanner
  • SPANNER_CHANGE_STREAM: Flujo de cambios de Cloud Spanner
  • BIGQUERY_DATASET: El conjunto de datos de BigQuery de salida de los flujos de cambios.

Flujos de cambio de Cloud Spanner a Pub/Sub

La transmisión de cambios de Cloud Spanner a la plantilla de Pub/Sub es una canalización de transmisión en la que se transmiten los registros de cambios de datos de Cloud Spanner y se los escribe en temas de Pub/Sub mediante Dataflow Runner V2.

Para enviar tus datos a un tema nuevo de Pub/Sub, primero debes crear el tema. Después de crearlo, Pub/Sub genera y adjunta automáticamente una suscripción al tema nuevo. Si intentas generar datos en un tema de Pub/Sub inexistente, la canalización de Dataflow mostrará una excepción y esta se detendrá a medida que intente establecer una conexión de forma continua.

Si el tema de Pub/Sub necesario ya existe, puedes generar datos en él.

Para obtener más información, consulta Acerca de los flujos de cambios, Compila conexiones de flujos de cambios con Dataflow y Prácticas recomendadas de flujos de cambio.

Requisitos para esta canalización:

  • La instancia de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La instancia de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
  • El flujo de cambios de Cloud Spanner debe existir antes de ejecutar la canalización.
  • El tema de Pub/Sub debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetro Descripción
spannerInstanceId La instancia de Cloud Spanner desde la que se leerán los flujos de cambio.
spannerDatabase La base de datos de Cloud Spanner desde la que se leerán los flujos de cambio.
spannerMetadataInstanceId La instancia de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
spannerMetadataDatabase La base de datos de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
spannerChangeStreamName El nombre del flujo de cambios de Cloud Spanner desde el que se leerá.
pubsubTopic El tema de Pub/Sub para el resultado de los flujos de cambios.
spannerProjectId (Opcional) El proyecto desde el que se leerán los flujos de cambio. Este es también el proyecto en el que se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
spannerMetadataTableName (Opcional). El nombre de la tabla de metadatos del conector de flujos de cambios de Cloud Spanner que se usará. Si no se proporciona la tabla de metadatos del conector de transmisión durante el cambio de flujo de canalización, Cloud Spanner la crea automáticamente. Debes proporcionar este parámetro cuando actualices una canalización existente. No uses este parámetro para otros casos.
rpcPriority (Opcional). La prioridad de solicitud para llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH, MEDIUM, LOW]. (Predeterminado: HIGH)
startTimestamp (Opcional). La fecha y hora de inicio, inclusiva, que se usará para leer los flujos de cambios. Por ejemplo, ex-2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo del inicio de la canalización, es decir, la hora actual.
endTimestamp (Opcional). La fecha y hora de finalización, inclusiva, que se usará para leer los flujos de cambios. Por ejemplo, ex-2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro.
outputFileFormat (Opcional) El formato del resultado. El resultado se une a muchos PubsubMessages y se envía a un tema de Pub/Sub. Los formatos permitidos son JSON y AVRO. El valor predeterminado es JSON.
pubsubAPI (Opcional) La API de Pub/Sub que se usa para implementar la canalización. Las APIs permitidas son pubsubio y native_client. Para una pequeña cantidad de consultas por segundo (QPS), native_client tiene menos latencia. Para una gran cantidad de QPS, pubsubio proporciona un rendimiento mejor y más estable. El valor predeterminado es pubsubio.

Ejecuta los flujos de cambios de Cloud Spanner en la plantilla de Pub/Sub

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to Pub/Sub template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

    gcloud beta dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Cloud Spanner
  • SPANNER_DATABASE: Base de datos de Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Cloud Spanner
  • SPANNER_METADATA_DATABASE: Base de datos de metadatos de Cloud Spanner
  • SPANNER_CHANGE_STREAM: Flujo de cambios de Cloud Spanner
  • PUBSUB_TOPIC: Es el tema de Pub/Sub para la salida de los flujos de cambios.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Cloud Spanner
  • SPANNER_DATABASE: Base de datos de Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Cloud Spanner
  • SPANNER_METADATA_DATABASE: Base de datos de metadatos de Cloud Spanner
  • SPANNER_CHANGE_STREAM: Flujo de cambios de Cloud Spanner
  • PUBSUB_TOPIC: Es el tema de Pub/Sub para la salida de los flujos de cambios.

De MongoDB a BigQuery (CDC)

La plantilla de datos de MongoDB a BigQuery CDC (captura de datos modificados) es una canalización de transmisión que funciona junto con los flujos de cambios de MongoDB. La canalización lee los registros JSON enviados a Pub/Sub a través de un flujo de cambios de MongoDB y los escribe en BigQuery según lo especificado por el parámetro userOption.

Requisitos para esta canalización

  • El conjunto de datos de destino de BigQuery debe existir.
  • Se debe poder acceder a la instancia de origen de MongoDB desde las máquinas de trabajador de Dataflow.
  • Los flujos de cambios que envían los cambios de MongoDB a Pub/Sub debería ejecutarse.

Parámetros de la plantilla

Parámetro Descripción
mongoDbUri Es el URI de conexión de MongoDB con el formato mongodb+srv://:@.
database La base de datos en MongoDB en la que se debe leer la colección. Por ejemplo: my-db.
collection Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection.
outputTableSpec La tabla de BigQuery en la que se escribirá. Por ejemplo, bigquery-project:dataset.output_table.
userOption FLATTEN o NONE. FLATTEN aplana los documentos al primer nivel. NONE almacena todo el documento como una string JSON.
inputTopic El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<project>/topics/<topic>.

Ejecuta la plantilla de MongoDB a BigQuery (CDC)

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the MongoDB to BigQuery (CDC) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \
    --parameters \
outputTableSpec=OUTPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION,\
userOption=USER_OPTION,\
inputTopic=INPUT_TOPIC

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • OUTPUT_TABLE_SPEC: Es el nombre de la tabla de BigQuery de destino.
  • MONGO_DB_URI: Es el URI de MongoDB.
  • DATABASE: Es tu base de datos de MongoDB.
  • COLLECTION: Es tu colección de MongoDB.
  • USER_OPTION: FLATTEN o NINGUNO.
  • INPUT_TOPIC: Es el tema de entrada de Pub/Sub.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "mongoDbUri": "MONGO_DB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "userOption": "USER_OPTION",
          "inputTopic": "INPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/
    • el nombre de la versión, como 2021-09-20-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
  • OUTPUT_TABLE_SPEC: Es el nombre de la tabla de BigQuery de destino.
  • MONGO_DB_URI: Es el URI de MongoDB.
  • DATABASE: Es tu base de datos de MongoDB.
  • COLLECTION: Es tu colección de MongoDB.
  • USER_OPTION: FLATTEN o NINGUNO.
  • INPUT_TOPIC: Es el tema de entrada de Pub/Sub.