Google proporciona un conjunto de plantillas de código abierto de Dataflow.
Estas plantillas de Dataflow pueden ayudarte a resolver grandes tareas de datos, incluidas la importación, la exportación, la copia de seguridad y el restablecimiento de datos, y las operaciones de API masivas, todo sin el uso de un entorno de desarrollo dedicado. Las plantillas se compilan en Apache Beam y usan Dataflow para transformar los datos.
Para obtener información general sobre las plantillas, consulta Plantillas de Dataflow. Para obtener una lista de todas las plantillas proporcionadas por Google, consulta Comienza a usar las plantillas proporcionadas por Google.En esta guía, se documentan las plantillas de transmisión.
Suscripción de Pub/Sub a BigQuery
La plantilla de suscripción de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON desde una suscripción de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.
Requisitos para esta canalización:
- El campo
data
de los mensajes de Pub/Sub debe usar el formato JSON, que se describe en esta guía de JSON. Por ejemplo, los mensajes con valores en el campodata
con formato{"k1":"v1", "k2":"v2"}
se pueden insertar en una tabla de BigQuery con dos columnas, llamadask1
yk2
, con un tipo de datos de string. - La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputSubscription |
Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription> . |
outputTableSpec |
Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table> . |
outputDeadletterTable |
La tabla de BigQuery para los mensajes que no llegaron a la tabla de salida, en el formato <my-project>:<my-dataset>.<my-table> .
Si no existe, se crea durante la ejecución de la canalización.
Si no se especifica, se usa OUTPUT_TABLE_SPEC_error_records en su lugar. |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
Ejecuta la plantilla de suscripción de Pub/Sub a BigQuery
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Subscription to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.DATASET
: Es el conjunto de datos de BigQuery.TABLE_NAME
: Es el nombre de la tabla de BigQuery.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.DATASET
: Es el conjunto de datos de BigQuery.TABLE_NAME
: Es el nombre de la tabla de BigQuery.
Tema de Pub/Sub a BigQuery
La plantilla de tema de Pub/Sub a BigQuery es una canalización de transmisión que lee mensajes con formato JSON de un tema de Pub/Sub y los escribe en una tabla de BigQuery. Puedes usar la plantilla como una solución rápida para mover datos de Pub/Sub a BigQuery. La plantilla lee los mensajes con formato JSON de Pub/Sub y los convierte en elementos de BigQuery.
Requisitos para esta canalización:
- El campo
data
de los mensajes de Pub/Sub debe usar el formato JSON, que se describe en esta guía de JSON. Por ejemplo, los mensajes con valores en el campodata
con formato{"k1":"v1", "k2":"v2"}
se pueden insertar en una tabla de BigQuery con dos columnas, llamadask1
yk2
, con un tipo de datos de string. - La tabla de salida debe existir antes de ejecutar la canalización. El esquema de la tabla debe coincidir con los objetos JSON de entrada.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputTopic |
El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<project>/topics/<topic> . |
outputTableSpec |
Ubicación de la tabla de salida de BigQuery, en el formato <my-project>:<my-dataset>.<my-table> . |
outputDeadletterTable |
La tabla de BigQuery para los mensajes que no llegaron a la tabla de resultados. Debe estar en formato <my-project>:<my-dataset>.<my-table> .
Si no existe, se crea durante la ejecución de la canalización.
Si no se especifica, se usa <outputTableSpec>_error_records en su lugar. |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
Ejecuta la plantilla del tema de Pub/Sub a BigQuery
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Topic to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).TOPIC_NAME
: Es el nombre del tema de Pub/Sub.DATASET
: Es el conjunto de datos de BigQuery.TABLE_NAME
: Es el nombre de la tabla de BigQuery.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).TOPIC_NAME
: Es el nombre del tema de Pub/Sub.DATASET
: Es el conjunto de datos de BigQuery.TABLE_NAME
: Es el nombre de la tabla de BigQuery.
Pub/Sub Avro a BigQuery
La plantilla de Pub/Sub Avro a BigQuery es una canalización de transmisión que transfiere datos de Avro desde una suscripción de Pub/Sub a una tabla de BigQuery. Cualquier error que ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de Pub/Sub sin procesar.
Requisitos para esta canalización
- La suscripción de entrada de Pub/Sub debe existir.
- El archivo de esquema para los registros de Avro debe existir en Cloud Storage.
- El tema de Pub/Sub sin procesar debe existir. ¿
- El conjunto de datos de salida de BigQuery debe existir.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
schemaPath |
Ubicación de Cloud Storage del archivo de esquema de Avro. Por ejemplo, gs://path/to/my/schema.avsc . |
inputSubscription |
Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription> . |
outputTopic |
El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
Ubicación de la tabla de salida de BigQuery. Por ejemplo, <my-project>:<my-dataset>.<my-table> .
Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el esquema de Avro proporcionado por el usuario. |
writeDisposition |
La WriteDisposition de BigQuery (opcional).
Por ejemplo, WRITE_APPEND , WRITE_EMPTY o WRITE_TRUNCATE . Predeterminada: WRITE_APPEND . |
createDisposition |
La CreateDisposition de BigQuery (opcional).
Por ejemplo: CREATE_IF_NEEDED , CREATE_NEVER . Predeterminada: CREATE_IF_NEEDED . |
Ejecuta la plantilla de Pub/Sub Avro a BigQuery
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Avro to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo,gs://MyBucket/file.avsc
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.DEADLETTER_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Avro (por ejemplo,gs://MyBucket/file.avsc
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.DEADLETTER_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
Proto de Pub/Sub a BigQuery
La plantilla de proto de Pub/Sub a BigQuery es una canalización de transmisión que transfiere
datos de proto desde una suscripción a Pub/Sub hacia una tabla de BigQuery.
Cualquier error que
ocurra mientras se escribe en la tabla de BigQuery se transmite a un tema de
Pub/Sub sin procesar.
Se puede proporcionar una función definida por el usuario (UDF) de JavaScript para transformar los datos. Los errores mientras se ejecuta la UDF se pueden enviar a un tema de Pub/Sub separado o al mismo tema sin procesar que los errores de BigQuery.
Requisitos para esta canalización:
- La suscripción de entrada de Pub/Sub debe existir.
- El archivo de esquema de los registros proto debe existir en Cloud Storage.
- El tema de Pub/Sub de salida debe existir.
- El conjunto de datos de salida de BigQuery debe existir.
- Si la tabla de BigQuery existe, debe tener un esquema que coincida con los datos del proto, sin importar el valor
createDisposition
.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
protoSchemaPath |
La ubicación de Cloud Storage del archivo de esquema proto autónomo. Por ejemplo, gs://path/to/my/file.pb .
Este archivo se puede generar con la marca --descriptor_set_out del comando protoc .
La marca --include_imports garantiza que el archivo sea autónomo. |
fullMessageName |
El nombre completo del mensaje proto. Por ejemplo, package.name.MessageName , en el que package.name es el valor proporcionado para la declaración package y no la declaración java_package . |
inputSubscription |
Suscripción de entrada de Pub/Sub desde la que se desea leer. Por ejemplo, projects/<project>/subscriptions/<subscription> . |
outputTopic |
El tema de Pub/Sub que se usará para registros no procesados. Por ejemplo, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
Ubicación de la tabla de salida de BigQuery. Por ejemplo, my-project:my_dataset.my_table .
Según la createDisposition especificada, la tabla de salida se puede crear de forma automática mediante el archivo de esquema de entrada. |
preserveProtoFieldNames |
true para conservar el nombre del campo Proto original en JSON (opcional). false para usar nombres JSON más estándar.
Por ejemplo, false cambiaría field_name a fieldName . (Default: false ) |
bigQueryTableSchemaPath |
Ruta de Cloud Storage a la ruta del esquema de BigQuery (opcional). Por ejemplo, gs://path/to/my/schema.json . Si no se proporciona, entonces el esquema se infiere a partir del esquema Proto. |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
udfOutputTopic |
El tema de Pub/Sub que almacena los errores de las UDF (opcional). Por ejemplo, projects/<project-id>/topics/<topic-name> . Si no se proporciona, los errores de UDF se envían al mismo tema que outputTopic . |
writeDisposition |
La WriteDisposition de BigQuery (opcional).
Por ejemplo, WRITE_APPEND , WRITE_EMPTY o WRITE_TRUNCATE . Valor predeterminado: WRITE_APPEND . |
createDisposition |
La CreateDisposition de BigQuery (opcional).
Por ejemplo: CREATE_IF_NEEDED , CREATE_NEVER . Configuración predeterminada: CREATE_IF_NEEDED . |
Ejecuta la plantilla de proto de Pub/Sub a BigQuery
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Proto to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo,gs://MyBucket/file.pb
).PROTO_MESSAGE_NAME
: Es el nombre del mensaje Proto (por ejemplo,package.name.MessageName
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.UNPROCESSED_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SCHEMA_PATH
: Es la ruta de acceso de Cloud Storage al archivo de esquema de Proto (por ejemplo,gs://MyBucket/file.pb
).PROTO_MESSAGE_NAME
: Es el nombre del mensaje Proto (por ejemplo,package.name.MessageName
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción de entrada de Pub/Sub.BIGQUERY_TABLE
: Es el nombre de la tabla de salida de BigQuery.UNPROCESSED_TOPIC
: Es el tema de Pub/Sub que se usará para la cola no procesada.
Pub/Sub a Pub/Sub
La plantilla de Pub/Sub a Pub/Sub es una canalización de transmisión que lee mensajes de una suscripción de Pub/Sub y los escribe en otro tema de Pub/Sub. La canalización también acepta una clave de atributo de mensaje opcional y un valor que se puede usar para filtrar los mensajes que se deben escribir en el tema de Pub/Sub. Puedes usar esta plantilla para copiar mensajes de una suscripción de Pub/Sub a otro tema de Pub/Sub con un filtro de mensajes opcional.
Requisitos para esta canalización:
- La suscripción a Pub/Sub de origen debe existir antes de la ejecución.
- La suscripción de Pub/Sub de origen debe ser una suscripción de extracción.
- El tema de Pub/Sub de destino debe existir antes de la ejecución.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputSubscription |
Suscripción a Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name> . |
outputTopic |
Tema de Cloud Pub/Sub en el que se escribe el resultado. Por ejemplo, projects/<project-id>/topics/<topic-name> . |
filterKey |
Eventos de filtro basados en la clave de atributo (opcional). No se aplican filtros si no se especifica filterKey . |
filterValue |
Valor del atributo del filtro para usar en caso de que se provea filterKey (opcional). De forma predeterminada, se usa un filterValue nulo. |
Ejecuta la plantilla de Pub/Sub a Pub/Sub
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Pub/Sub template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.TOPIC_NAME
: Es el nombre del tema de Pub/Sub.FILTER_KEY
: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.FILTER_VALUE
: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).SUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.TOPIC_NAME
: Es el nombre del tema de Pub/Sub.FILTER_KEY
: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.FILTER_VALUE
: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.
Pub/Sub a Splunk
La plantilla de Pub/Sub a Splunk es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub y escribe la carga útil del mensaje en Splunk mediante el recopilador de eventos HTTP (HEC) de Splunk. El caso de uso más común de esta plantilla es exportar registros a Splunk. Para ver un ejemplo del flujo de trabajo subyacente, consulta Implementa exportaciones de registros listas para la producción a Splunk mediante Dataflow.
Antes de escribir en Splunk, también puedes aplicar una función definida por el usuario de JavaScript a la carga útil del mensaje. Los mensajes con fallas de procesamiento se reenvían a un tema de mensajes no enviados de Pub/Sub para solucionar los problemas y volver a procesarlos.
Como una capa adicional de protección para tu token HEC, también puedes pasar una clave de Cloud KMS junto con el parámetro de token HEC codificado en base64 encriptado con la clave de Cloud KMS. Consulta el extremo de encriptación de la API de Cloud KMS para obtener detalles adicionales sobre la encriptación de tu parámetro de token HEC.
Requisitos para esta canalización:
- La suscripción de Pub/Sub de origen debe existir antes de ejecutar la canalización.
- El tema sin procesar de Pub/Sub debe existir antes de ejecutar la canalización.
- Se debe poder acceder al extremo de HEC de Splunk desde la red de trabajadores de Dataflow.
- El token HEC de Splunk se debe generar y estar disponible.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputSubscription |
La suscripción de Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/<project-id>/subscriptions/<subscription-name> . |
token |
(Opcional) El token de autenticación HEC de Splunk. Se debe proporcionar si tokenSource está configurado como PLAINTEXT o KMS. |
url |
La URL de HEC de Splunk. Debe ser enrutable desde la VPC en la que se ejecuta la canalización. Por ejemplo, https://splunk-hec-host:8088. |
outputDeadletterTopic |
El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name> . |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
batchCount |
El tamaño del lote para enviar varios eventos a Splunk (opcional). Predeterminado 1 (sin lotes). |
parallelism |
La cantidad máxima de solicitudes paralelas (opcional). Predeterminado 1 (sin paralelismo). |
disableCertificateValidation |
Inhabilita la validación del certificado SSL (opcional). El valor predeterminado es falso (validación habilitada). Si es verdadero, los certificados no se validan (todos los certificados son de confianza) y se ignora el parámetro “rootCaCertificatePath”. |
includePubsubMessage |
Incluye el mensaje de Pub/Sub completo en la carga útil (opcional). El valor predeterminado es falso (solo se incluye el elemento de datos en la carga útil). |
tokenSource |
Fuente del token. Puede ser PLAINTEXT, KMS o SECRET_MANAGER. Este parámetro se debe proporcionar si se usa Secret Manager.
Si tokenSource se configura como KMS, tokenKMSEncryptionKey y el token encriptado se deben proporcionar.
Si tokenSource se configura como SECRET_MANAGER, tokenSecretId se debe proporcionar.
Si tokenSource se configura como PLAINTEXT, token se debe proporcionar.
|
tokenKMSEncryptionKey |
La clave de Cloud KMS para desencriptar la string del token HEC (opcional). Este parámetro se debe proporcionar si tokenSource se configura como KMS.
Si se proporciona la clave de Cloud KMS, la string del token HEC debe pasarse encriptada. |
tokenSecretId |
(Opcional) El ID del Secret de Secret Manager para el token. Este parámetro debe proporcionarse si el tokenSource está configurado como SECRET_MANAGER.
Debe tener el formato projects/<project-id>/secrets/<secret-name>/versions/<secret-version> . |
rootCaCertificatePath |
La URL completa al certificado de CA raíz en Cloud Storage (opcional). Por ejemplo, gs://mybucket/mycerts/privateCA.crt . El certificado provisto en Cloud Storage debe estar codificado en DER y puede proporcionarse en codificación binaria o imprimible (Base64).
Si el certificado se proporciona en codificación Base64, debe estar delimitado al comienzo por -----BEGIN CERTIFICATE----- y debe estar limitado al final por -----END CERTIFICATE-----. Si se proporciona este parámetro, este archivo de certificado de CA privado se recupera y se agrega al almacén de confianza del trabajador de Dataflow para verificar el certificado SSL del extremo del HEC de Splunk.
Si no se proporciona este parámetro, se usa el almacén de confianza predeterminado. |
enableBatchLogs |
(Opcional) Especifica si se deben habilitar los registros para los lotes escritos en Splunk. Valor predeterminado: true . |
enableGzipHttpCompression |
(Opcional) Especifica si las solicitudes HTTP enviadas a HEC de Splunk deben comprimirse (contenido gzip codificado). Valor predeterminado: true . |
Ejecuta la plantilla de Pub/Sub a Splunk
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Splunk template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\ token=TOKEN,\ url=URL,\ outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ batchCount=BATCH_COUNT,\ parallelism=PARALLELISM,\ disableCertificateValidation=DISABLE_VALIDATION,\ rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).INPUT_SUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.TOKEN
: Es el token del recopilador de eventos HTTP de Splunk.URL
: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo,https://splunk-hec-host:8088
).DEADLETTER_TOPIC_NAME
: Es el nombre del tema de Pub/Sub.JAVASCRIPT_FUNCTION
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.PATH_TO_JAVASCRIPT_UDF_FILE
: El URI de Cloud Storage de.js
archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT
: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.PARALLELISM
: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.DISABLE_VALIDATION
: Estrue
si deseas inhabilitar la validación del certificado SSL.ROOT_CA_CERTIFICATE_PATH
: La ruta al certificado de CA raíz en Cloud Storage (por ejemplo,gs://your-bucket/privateCA.crt
)
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION", "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH" } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).INPUT_SUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.TOKEN
: Es el token del recopilador de eventos HTTP de Splunk.URL
: Es la ruta de URL para el recopilador de eventos HTTP de Splunk (por ejemplo,https://splunk-hec-host:8088
).DEADLETTER_TOPIC_NAME
: Es el nombre del tema de Pub/Sub.JAVASCRIPT_FUNCTION
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.PATH_TO_JAVASCRIPT_UDF_FILE
: El URI de Cloud Storage de.js
archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT
: Es el tamaño del lote que se debe usar para enviar varios eventos a Splunk.PARALLELISM
: Es la cantidad de solicitudes paralelas que se usarán para enviar eventos a Splunk.DISABLE_VALIDATION
: Estrue
si deseas inhabilitar la validación del certificado SSL.ROOT_CA_CERTIFICATE_PATH
: La ruta al certificado de CA raíz en Cloud Storage (por ejemplo,gs://your-bucket/privateCA.crt
)
Pub/Sub a archivos Avro en Cloud Storage
La plantilla de Pub/Sub a archivos de Avro en Cloud Storage es una canalización de transmisión que lee datos de un tema de Pub/Sub y escribe archivos de Avro en el bucket de Cloud Storage especificado.
Requisitos para esta canalización:
- El tema de entrada de Pub/Sub debe existir antes de la ejecución de la canalización.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputTopic |
Tema de Pub/Sub al cual suscribirse para el consumo de mensaje. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name> . |
outputDirectory |
Directorio de salida en el que se archivan los archivos Avro de salida. Debe contener / al final.
Por ejemplo: gs://example-bucket/example-directory/ . |
avroTempDirectory |
Directorio para los archivos de Avro temporales. Debe contener / al final. Por ejemplo: gs://example-bucket/example-directory/ . |
outputFilenamePrefix |
Prefijo de nombre de archivo de salida para los archivos Avro (opcional). |
outputFilenameSuffix |
[Opcional] Sufijo de nombre de archivo de salida para los archivos Avro. |
outputShardTemplate |
[Opcional] Plantilla de fragmentación del archivo de salida. Se especifica como secuencias repetidas de las letras S o N . Por ejemplo, SSS-NNN . Estas se reemplazan por el número de fragmento o la cantidad total de fragmentos, respectivamente. Si no se especifica este parámetro, el formato de plantilla predeterminado es W-P-SS-of-NN . |
Ejecuta la plantilla de Pub/Sub a Cloud Storage Avro
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Avro Files on Cloud Storage template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=FILENAME_PREFIX,\ outputFilenameSuffix=FILENAME_SUFFIX,\ outputShardTemplate=SHARD_TEMPLATE,\ avroTempDirectory=gs://BUCKET_NAME/temp/
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).TOPIC_NAME
: Es el nombre del tema de Pub/Sub.BUCKET_NAME
: Es el nombre del bucket de Cloud Storage.FILENAME_PREFIX
: Es el prefijo del nombre de archivo de salida que prefieras.FILENAME_SUFFIX
: Es el sufijo del nombre de archivo de salida que prefieras.SHARD_TEMPLATE
: Es la plantilla de fragmentación de salida que prefieras.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputDirectory": "gs://BUCKET_NAME/output/", "avroTempDirectory": "gs://BUCKET_NAME/temp/", "outputFilenamePrefix": "FILENAME_PREFIX", "outputFilenameSuffix": "FILENAME_SUFFIX", "outputShardTemplate": "SHARD_TEMPLATE" } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).TOPIC_NAME
: Es el nombre del tema de Pub/Sub.BUCKET_NAME
: Es el nombre del bucket de Cloud Storage.FILENAME_PREFIX
: Es el prefijo del nombre de archivo de salida que prefieras.FILENAME_SUFFIX
: Es el sufijo del nombre de archivo de salida que prefieras.SHARD_TEMPLATE
: Es la plantilla de fragmentación de salida que prefieras.
Tema de Pub/Sub a archivos de texto en Cloud Storage
La plantilla de Pub/Sub a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.
Requisitos para esta canalización:
- El tema de Pub/Sub debe existir antes de la ejecución.
- Los mensajes publicados en el tema deben tener formato de texto.
- Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputTopic |
El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name> . |
outputDirectory |
La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/ . El valor debe terminar con una barra. |
outputFilenamePrefix |
El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output- . |
outputFilenameSuffix |
El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv . |
outputShardTemplate |
La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN , en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01 .
|
Ejecuta la plantilla de Pub/Sub a archivos de texto en Cloud Storage
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Text Files on Cloud Storage template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).TOPIC_NAME
: Es el nombre del tema de Pub/Sub.BUCKET_NAME
: Es el nombre del bucket de Cloud Storage.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).TOPIC_NAME
: Es el nombre del tema de Pub/Sub.BUCKET_NAME
: Es el nombre del bucket de Cloud Storage.
Tema de Pub/Sub o suscripción a archivos de texto en Cloud Storage
El tema de Pub/Sub o la suscripción a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.
Requisitos para esta canalización:
- El tema o la suscripción a Pub/Sub deben existir antes de la ejecución.
- Los mensajes publicados en el tema deben tener formato de texto.
- Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputTopic |
El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name> . Si se proporciona este parámetro, no se debe proporcionar inputSubscription . |
inputSubscription |
El tema de Pub/Sub desde el que se lee la entrada. El nombre de la suscripción debe tener el formato projects/<project-id>/subscription/<subscription-name> . Si se proporciona este parámetro, no se debe proporcionar inputTopic . |
outputDirectory |
La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/ . El valor debe terminar con una barra. |
outputFilenamePrefix |
El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output- . |
outputFilenameSuffix |
El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv . |
outputShardTemplate |
La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN , en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01 .
|
windowDuration |
La duración de la ventana es el intervalo en el que se escriben los datos en el directorio de salida (opcional). Configura la duración en función de la capacidad de procesamiento de la canalización. Por ejemplo, una capacidad de procesamiento mayor puede requerir tamaños de ventana más pequeños para que los datos se ajusten a la memoria. La configuración predeterminada es de 5 min, con un mínimo de 1 s. Los formatos permitidos son: [nro. entero] s (para los segundos, por ejemplo, 5 s), [nro. entero] min (para los minutos, por ejemplo, 12 min) y [nro. entero] h (para las horas, por ejemplo, 2 h). |
Ejecuta la plantilla del tema o suscripción de Pub/Sub a archivos de texto en Cloud Storage
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template jobs run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.BUCKET_NAME
: Es el nombre de tu bucket de Cloud Storage.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
SUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.BUCKET_NAME
: Es el nombre de tu bucket de Cloud Storage.
Pub/Sub a MongoDB
La plantilla de Pub/Sub a MongoDB es una canalización de transmisión que lee mensajes con codificación JSON de una suscripción a Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, esta canalización admite transformaciones adicionales que se pueden incluir mediante una función definida por el usuario (UDF) de JavaScript. Cualquier error que se produjo debido a una falta de coincidencia del esquema, JSON con errores o mientras se ejecutaban transformaciones se registra en una tabla de BigQuery para mensajes no procesados junto con un mensaje de entrada. Si no existe una tabla para los registros no procesados antes de la ejecución, la canalización crea esta tabla de forma automática.
Requisitos para esta canalización:
- La suscripción a Pub/Sub debe existir y los mensajes deben estar codificados en un formato JSON válido.
- El clúster de MongoDB debe existir y debe ser accesible desde las máquinas de trabajador de Dataflow.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputSubscription |
Nombre de la suscripción a Pub/Sub. Por ejemplo: . |
mongoDBUri |
Una lista separada por comas de los servidores de MongoDB. Por ejemplo: 192.285.234.12:27017,192.287.123.11:27017 . |
database |
La base de datos en MongoDB en la que se debe almacenar la colección. Por ejemplo: my-db . |
collection |
Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection . |
deadletterTable |
La tabla de BigQuery que almacena mensajes debido a fallas (esquemas no coincidentes, JSON con formato incorrecto, etcétera). Por ejemplo: project-id:dataset-name.table-name . |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
batchSize |
El tamaño de lote que se usa para la inserción por lotes de documentos en MongoDB (opcional). Valor predeterminado: 1000 . |
batchSizeBytes |
El tamaño del lote en bytes (opcional). Valor predeterminado: 5242880 . |
maxConnectionIdleTime |
El tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión (opcional). Valor predeterminado: 60000 . |
sslEnabled |
El valor booleano que indica si la conexión a MongoDB está habilitada con SSL (opcional). Valor predeterminado: true . |
ignoreSSLCertificate |
El valor booleano que indica si se debe ignorar el certificado SSL (opcional). Valor predeterminado: true . |
withOrdered |
El valor booleano que habilita las inserciones masivas ordenadas en MongoDB (opcional). Valor predeterminado: true . |
withSSLInvalidHostNameAllowed |
El valor booleano que indica si se permite un nombre de host no válido para la conexión SSL (opcional). Valor predeterminado: true . |
Ejecuta la plantilla de Pub/Sub a MongoDB
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to MongoDB template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \ --parameters \ inputSubscription=INPUT_SUBSCRIPTION,\ mongoDBUri=MONGODB_URI,\ database=DATABASE, collection=COLLECTION, deadletterTable=UNPROCESSED_TABLE
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.REGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
INPUT_SUBSCRIPTION
: Es la suscripción a Pub/Sub (por ejemplo,
).projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
: Son las direcciones del servidor de MongoDB (por ejemplo,192.285.234.12:27017,192.287.123.11:27017
).DATABASE
: Es el nombre de la base de datos de MongoDB (por ejemplo,users
).COLLECTION
: Es el nombre de la colección de MongoDB (por ejemplo,profiles
).UNPROCESSED_TABLE
: Es el nombre de la tabla de BigQuery (por ejemplo,your-project:your-dataset.your-table-name
).
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "INPUT_SUBSCRIPTION", "mongoDBUri": "MONGODB_URI", "database": "DATABASE", "collection": "COLLECTION", "deadletterTable": "UNPROCESSED_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.LOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
INPUT_SUBSCRIPTION
: Es la suscripción a Pub/Sub (por ejemplo,
).projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
: Son las direcciones del servidor de MongoDB (por ejemplo,192.285.234.12:27017,192.287.123.11:27017
).DATABASE
: Es el nombre de la base de datos de MongoDB (por ejemplo,users
).COLLECTION
: Es el nombre de la colección de MongoDB (por ejemplo,profiles
).UNPROCESSED_TABLE
: Es el nombre de la tabla de BigQuery (por ejemplo,your-project:your-dataset.your-table-name
).
Pub/Sub a Elasticsearch
La plantilla de Pub/Sub a Elasticsearch es una canalización de transmisión que lee mensajes de una suscripción a Pub/Sub, ejecuta una función definida por el usuario (UDF) y los escribe en Elasticsearch como documentos. La plantilla de Dataflow usa la función de flujos de datos de Elasticsearch para almacenar datos de series temporales en varios índices y, al mismo tiempo, entregarte un solo recurso con nombre para solicitudes. Los flujos de datos son adecuados para los registros, las métricas, los seguimientos y otros datos generados de forma continua que se almacenan en Pub/Sub.
Requisitos para esta canalización
- La suscripción a Pub/Sub debe existir, y los mensajes deben estar codificados en un formato JSON válido.
- Un host de Elasticsearch accesible de forma pública en una instancia de GCP o en Elastic Cloud con Elasticsearch versión 7.0 o posterior. Consulta Integración de Google Cloud para Elastic si deseas obtener más detalles.
- Un tema de Pub/Sub para resultados de error.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputSubscription |
La suscripción de Cloud Pub/Sub desde la que se realiza el consumo. El nombre debe tener el formato projects/<project-id>/subscriptions/<subscription-name> . |
connectionUrl |
URL de Elasticsearch en el formato https://hostname:[port] o especificar el CloudID si usas Elastic Cloud. |
apiKey |
Clave de API codificada en Base64 que se usa para la autenticación. |
errorOutputTopic |
Tema de salida de Pub/Sub para publicar registros con errores en el formato projects/<project-id>/topics/<topic-name> |
dataset |
El tipo de registros enviados a través de Pub/Sub para los que tenemos un panel listo para usar (opcional). Los valores de tipos de registros conocidos son audit, vpcflow y firewall. Valor predeterminado: pubsub . |
namespace |
Una agrupación arbitraria, como un entorno (dev, prod, or qa), un equipo o una unidad de negocios estratégica (opcional). Valor predeterminado: default . |
batchSize |
El tamaño del lote en cantidad de documentos (opcional). Valor predeterminado: 1000 . |
batchSizeBytes |
El tamaño del lote en cantidad de bytes (opcional). Valor predeterminado: 5242880 (5 MB). |
maxRetryAttempts |
La cantidad máxima de reintentos debe ser mayor que 0 (opcional). Valor predeterminado: no retries . |
maxRetryDuration |
La duración máxima del reintento en milisegundos debe ser superior a 0 (opcional). Valor predeterminado: no retries . |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
propertyAsIndex |
Opcional: Una propiedad en el documento que se indexa, cuyo valor especificará los metadatos _index para incluir con el documento en la solicitud masiva (tiene prioridad sobre una UDF _index ). Valor predeterminado: none. |
propertyAsId |
Opcional: Una propiedad en el documento que se indexa, cuyo valor especificará los metadatos _id para incluir con el documento en la solicitud masiva (tiene prioridad sobre una UDF _id ). Valor predeterminado: none. |
javaScriptIndexFnGcsPath |
La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _index a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none. |
javaScriptIndexFnName |
Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _index que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none. |
javaScriptIdFnGcsPath |
La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _id a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none. |
javaScriptIdFnName |
Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _id que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none. |
javaScriptTypeFnGcsPath |
La ruta de Cloud Storage a la fuente de UDF de JavaScript para una función que especificará los metadatos _type a fin de incluir el documento en la solicitud masiva (opcional). Valor predeterminado: none. |
javaScriptTypeFnName |
Opcional: El nombre de función de JavaScript de la UDF para la función que especificará los metadatos _type que se incluirán en el documento en la solicitud masiva. Valor predeterminado: none. |
javaScriptIsDeleteFnGcsPath |
La ruta de acceso de Cloud Storage a la fuente de UDF de JavaScript para la función que determinará si el documento se debe borrar en lugar de insertar o actualizar (opcional). La función debe mostrar el valor de string "true" o "false" . Valor predeterminado: none. |
javaScriptIsDeleteFnName |
Opcional: El nombre de función de JavaScript de la UDF para la función que determinará si el documento se debe borrar en lugar de insertarse o actualizarse. La función debe mostrar el valor de string "true" o "false" . Valor predeterminado: none. |
usePartialUpdate |
Opcional: Indica si se deben usar actualizaciones parciales (actualizar en lugar de crear o indexar), que permite documentos parciales con solicitudes de Elasticsearch. Valor predeterminado: false . |
bulkInsertMethod |
Opcional: Si se debe usar INDEX (índice, permite inserción) o CREATE (creación, errores en _id duplicado) con solicitudes masivas de Elasticsearch. Valor predeterminado: CREATE . |
Ejecuta la plantilla de Pub/Sub a Elasticsearch
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Elasticsearch template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
ERROR_OUTPUT_TOPIC
: Es tu tema de Pub/Sub para resultados de errorSUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.CONNECTION_URL
: Es tu URL de ElasticsearchDATASET
: Es tu tipo de registroNAMESPACE
: Es el espacio de nombres para el conjunto de datosAPIKEY
: Es tu clave de API codificada en Base64 para la autenticación
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
ERROR_OUTPUT_TOPIC
: Es tu tema de Pub/Sub para resultados de errorSUBSCRIPTION_NAME
: Es el nombre de la suscripción a Pub/Sub.CONNECTION_URL
: Es tu URL de ElasticsearchDATASET
: Es tu tipo de registroNAMESPACE
: Es el espacio de nombres para el conjunto de datosAPIKEY
: Es tu clave de API codificada en Base64 para la autenticación
Datastream a Cloud Spanner
La plantilla de Datastream para Cloud Spanner es una canalización de transmisión que lee eventos de Datastream desde un bucket de Cloud Storage y los escribe en una base de datos de Cloud Spanner. Está diseñado para la migración de datos de fuentes de Datastream a Cloud Spanner.
Todas las tablas necesarias para la migración deben existir en la base de datos de destino de Cloud Spanner antes de la ejecución de la plantilla. Por lo tanto, la migración del esquema de una base de datos de origen a Cloud Spanner de destino debe completarse antes de la migración de datos. Los datos pueden existir en las tablas antes de la migración. Esta plantilla no propaga los cambios de esquema de Datastream a la base de datos de Cloud Spanner.
La coherencia de los datos solo está garantizada al final de la migración cuando todos los datos se escribieron en Cloud Spanner. A fin de almacenar información sobre el orden para cada registro escrito en Cloud Spanner, esta plantilla crea una tabla adicional (llamada tabla paralela) para cada tabla en la base de datos de Cloud Spanner. Esto se usa para garantizar la coherencia al final de la migración. Las tablas paralelas no se borran después de la migración y se pueden usar con fines de validación al final de la migración.
Cualquier error que ocurra durante la operación, como discrepancias de esquema, archivos JSON con formato incorrecto o errores resultantes de la ejecución de transformaciones, se registra en una cola de errores. La cola de errores es una carpeta de Cloud Storage que almacena todos los eventos de Datastream que encontraron errores junto con el motivo del error en formato de texto. Los errores pueden ser transitorios o permanentes, y se almacenan en las carpetas de Cloud Storage adecuadas en la cola de errores. Los errores transitorios se reintentan automáticamente, mientras que los errores permanentes no. En el caso de errores permanentes, tienes la opción de corregir los eventos de cambio y moverlos al bucket que se puede reintentar mientras se ejecuta la plantilla.
Requisitos para esta canalización:
- Una transmisión de Datastream en estado En ejecución o No iniciado
- Un bucket de Cloud Storage en el que se replican los eventos de Datastream.
- Una base de datos de Cloud Spanner con tablas existentes. Estas tablas pueden estar vacías o contener datos.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputFilePattern |
La ubicación de los archivos de Datastream en Cloud Storage para replicar. Por lo general, esta es la ruta de acceso raíz de una transmisión. |
streamName |
El nombre o la plantilla del flujo que se consultará para obtener la información del esquema y el tipo de fuente. |
instanceId |
La instancia de Cloud Spanner en la que se replican los cambios. |
databaseId |
La base de datos de Cloud Spanner en la que se replican los cambios. |
projectId |
El ID del proyecto de Cloud Spanner. |
deadLetterQueueDirectory |
Esta es ruta de acceso del archivo para almacenar el resultado de la cola de errores (opcional). El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. |
inputFileFormat |
El formato del archivo de salida que produce Datastream (opcional). Por ejemplo: avro,json . Valor predeterminado, avro . |
shadowTablePrefix |
El prefijo que se usa para nombrar las tablas de paralelas (opcional). Valor predeterminado: shadow_ . |
Ejecuta la plantilla de Datastream para Cloud Spanner
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Datastream to Spanner template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ streamName=STREAM_NAME,\ instanceId=CLOUDSPANNER_INSTANCE,\ databaseId=CLOUDSPANNER_DATABASE,\ deadLetterQueueDirectory=DLQ
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
GCS_FILE_PATH
: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo:gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
: es la instancia de Cloud Spanner.CLOUDSPANNER_DATABASE
: es la base de datos de Cloud Spanner.DLQ
: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "streamName": "STREAM_NAME" "instanceId": "CLOUDSPANNER_INSTANCE" "databaseId": "CLOUDSPANNER_DATABASE" "deadLetterQueueDirectory": "DLQ" } } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
GCS_FILE_PATH
: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo:gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
: es la instancia de Cloud Spanner.CLOUDSPANNER_DATABASE
: es la base de datos de Cloud Spanner.DLQ
: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.
Archivos de texto en Cloud Storage a BigQuery (transmisión)
La canalización de archivos de texto en Cloud Storage a BigQuery es una canalización de transmisión que te permite transmitir archivos de texto almacenados en Cloud Storage, transformarlos con una función definida por el usuario (UDF) de JavaScript que proporciones y adjuntar el resultado en BigQuery.
La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual a través de una cancelación y no una desviación, debido a su uso de la transformación Watch
, que es una DoFn
divisible que no admite el desvío.
Requisitos para esta canalización:
- Crea un archivo JSON que describa el esquema de tu tabla de salida en BigQuery.
Asegúrate de que haya un array JSON de nivel superior titulado
fields
y que su contenido siga el patrón{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. Por ejemplo:{ "fields": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING", "mode": "REQUIRED" }, { "name": "coffee", "type": "STRING", "mode": "REQUIRED" } ] }
- Crea un archivo JavaScript (
.js
) con tu función UDF que proporciona la lógica para transformar las líneas de texto. Ten en cuenta que tu función debe mostrar una string JSON.Por ejemplo, esta función divide cada línea de un archivo CSV y muestra una string JSON después de transformar los valores.
function transform(line) { var values = line.split(','); var obj = new Object(); obj.location = values[0]; obj.name = values[1]; obj.age = values[2]; obj.color = values[3]; obj.coffee = values[4]; var jsonString = JSON.stringify(obj); return jsonString; }
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
JSONPath |
Ubicación de Cloud Storage de tu archivo de esquema de BigQuery, descrito como un JSON.
Por ejemplo: gs://path/to/my/schema.json . |
javascriptTextTransformFunctionName |
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
outputTable |
La tabla de BigQuery calificada por completo.
Por ejemplo: my-project:dataset.table |
inputFilePattern |
Ubicación en Cloud Storage del texto que quieres procesar.
Por ejemplo: gs://my-bucket/my-files/text.txt . |
bigQueryLoadingTemporaryDirectory |
Directorio temporal para el proceso de carga de BigQuery.
Por ejemplo: gs://my-bucket/my-files/temp_dir . |
outputDeadletterTable |
Tabla de mensajes que no llegaron a la tabla de resultados.
Por ejemplo: my-project:dataset.my-unprocessed-table . Si no existe, se crea durante la ejecución de la canalización.
Si no se especifica, se usa <outputTableSpec>_error_records en su lugar. |
Ejecuta la plantilla de Cloud Storage Text a BigQuery (transmisión)
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).JAVASCRIPT_FUNCTION
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.PATH_TO_BIGQUERY_SCHEMA_JSON
: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.PATH_TO_JAVASCRIPT_UDF_FILE
: El URI de Cloud Storage de.js
archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.BIGQUERY_TABLE
: Es el nombre de la tabla de BigQuery.BIGQUERY_UNPROCESSED_TABLE
: Es el nombre de la tabla de BigQuery para los mensajes no procesados.PATH_TO_TEMP_DIR_ON_GCS
: Es la ruta de acceso de Cloud Storage al directorio temporal.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).JAVASCRIPT_FUNCTION
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.PATH_TO_BIGQUERY_SCHEMA_JSON
: Es la ruta de acceso de Cloud Storage al archivo JSON que contiene la definición de esquema.PATH_TO_JAVASCRIPT_UDF_FILE
: El URI de Cloud Storage de.js
archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: Es la ruta de acceso de Cloud Storage al conjunto de datos de texto.BIGQUERY_TABLE
: Es el nombre de la tabla de BigQuery.BIGQUERY_UNPROCESSED_TABLE
: Es el nombre de la tabla de BigQuery para los mensajes no procesados.PATH_TO_TEMP_DIR_ON_GCS
: Es la ruta de acceso de Cloud Storage al directorio temporal.
Archivos de texto en Cloud Storage a Pub/Sub (transmisión)
Esta plantilla crea una canalización de transmisión que sondea de forma continua los archivos de texto nuevos subidos a Cloud Storage, lee cada archivo línea por línea y publica strings en un tema de Pub/Sub. La plantilla publica registros en un archivo delimitado por saltos de línea que contiene registros JSON o archivos CSV en un tema de Pub/Sub para su procesamiento en tiempo real. Puedes usar esta plantilla para reproducir datos en Pub/Sub.
La canalización se ejecuta de forma indefinida, y se debe finalizar de forma manual mediante una “cancelación” y no un “drain”, debido a su uso de la transformación “Watch”, que es un “SplittableDoFn” que no admite el desvío.
Actualmente, el intervalo de sondeo es fijo y configurado en 10 segundos. Esta plantilla no establece una marca de tiempo en los registros individuales. Es por eso que la hora del evento es la misma que la hora de publicación durante la ejecución. Si tu canalización depende de la hora precisa del evento para el procesamiento, no deberías usar esta canalización.
Requisitos para esta canalización:
- Los archivos de entrada deben tener el formato JSON delimitado por saltos de línea o CSV. Los registros que abarcan varias líneas en los archivos de origen pueden causar problemas de bajada, ya que cada línea dentro de los archivos se publica como un mensaje en Pub/Sub.
- El tema de Pub/Sub debe existir antes de la ejecución.
- La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputFilePattern |
El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/files/*.json o gs://bucket-name/path/*.csv . |
outputTopic |
El tema de entrada de Pub/Sub en el que se desea escribir. El nombre debe tener el formato projects/<project-id>/topics/<topic-name> . |
Ejecuta la plantilla de archivos de texto en Cloud Storage a Pub/Sub (transmisión)
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to Pub/Sub (Stream) template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).TOPIC_NAME
: Es el nombre del tema de Pub/Sub.BUCKET_NAME
: Es el nombre de tu bucket de Cloud Storage.FILE_PATTERN
: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo,path/*.csv
).
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).TOPIC_NAME
: Es el nombre del tema de Pub/Sub.BUCKET_NAME
: Es el nombre de tu bucket de Cloud Storage.FILE_PATTERN
: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo,path/*.csv
).
Enmascaramiento de datos y asignación de token de Cloud Storage a BigQuery (con DLP de Cloud)
La plantilla de enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP) es una canalización de transmisión que lee archivos csv de un bucket de Cloud Storage, llama a la API de Cloud Data Loss Prevention (Cloud DLP) para la desidentificación y escribe los datos desidentificados en la tabla de BigQuery especificada. Esta plantilla admite el uso de una plantilla de inspección de Cloud DLP y una plantilla de desidentificación de Cloud DLP. Esto permite que los usuarios inspeccionen información que puede ser sensible y desidentificar datos estructurados en los que las columnas están especificadas para ser desidentificadas y no se necesita la inspección. También es importante tener en cuenta que esta plantilla no admite una ruta de acceso regional para la ubicación de la plantilla de desidentificación. Solo se admite una ruta global.
Requisitos para esta canalización:
- Los datos de entrada para la asignación de tokens deben existir
- Las plantillas de Cloud DLP deben existir (por ejemplo, InspectTemplate y DeidentifyTemplate). Consulta Plantillas de Cloud DLP para obtener más detalles.
- El conjunto de datos de BigQuery debe existir
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputFilePattern |
Los archivos csv desde los que se leen los registros de datos de entrada. También se acepta el comodín. Por ejemplo, gs://mybucket/my_csv_filename.csv o gs://mybucket/file-*.csv .
|
dlpProjectId |
ID del proyecto de Cloud DLP que posee el recurso de la API de Cloud DLP.
Este proyecto de Cloud DLP puede ser el mismo que posee las plantillas de Cloud DLP, o puede ser uno independiente.
Por ejemplo, my_dlp_api_project . |
deidentifyTemplateName |
Plantilla de desidentificación de Cloud DLP que se usa para las solicitudes a la API, especificada con el patrón projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} .
Por ejemplo, projects/my_project/deidentifyTemplates/100 . |
datasetName |
Conjunto de datos de BigQuery para enviar resultados con asignación de token. |
batchSize |
Tamaño de fragmentación o del lote para enviar datos a fin de inspeccionar o quitar la asignación de token. En el caso de un archivo csv, batchSize es la cantidad de filas en un lote. Los usuarios deben determinar el tamaño del lote según el tamaño de los registros y del archivo. Ten en cuenta que la API de Cloud DLP tiene un límite de tamaño de carga útil de 524 KB por llamada a la API. |
inspectTemplateName |
Plantilla de inspección de Cloud DLP que se usará para solicitudes a la API, especificada con el patrón projects/{template_project_id}/identifyTemplates/{idTemplateId} (opcional).
Por ejemplo, projects/my_project/identifyTemplates/100 |
Ejecuta la plantilla de enmascaramiento de datos y asignación de tokens de Cloud Storage a BigQuery (con Cloud DLP)
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputFilePattern=INPUT_DATA,\ datasetName=DATASET_NAME,\ batchSize=BATCH_SIZE_VALUE,\ dlpProjectId=DLP_API_PROJECT_ID,\ deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\ inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER
Reemplaza lo siguiente:
DLP_API_PROJECT_ID
: Es el ID del proyecto de la API de Cloud DLP.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).INPUT_DATA
: Es la ruta de acceso del archivo de entrada.DEIDENTIFY_TEMPLATE
: Es el número de plantilla de Cloud DLPDeidentify.DATASET_NAME
: Es el nombre del conjunto de datos de BigQuery.INSPECT_TEMPLATE_NUMBER
: Es el número de plantilla de Cloud DLPInspect.BATCH_SIZE_VALUE
: Es el tamaño del lote (número de filas por API para csv).
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern":INPUT_DATA, "datasetName": "DATASET_NAME", "batchSize": "BATCH_SIZE_VALUE", "dlpProjectId": "DLP_API_PROJECT_ID", "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE", "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER" } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.DLP_API_PROJECT_ID
: Es el ID del proyecto de la API de Cloud DLP.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
STAGING_LOCATION
: la ubicación para los archivos locales de etapa de pruebas (por ejemplo,gs://your-bucket/staging
).TEMP_LOCATION
: Es la ubicación en la que se deben escribir archivos temporales (por ejemplo,gs://your-bucket/temp
).INPUT_DATA
: Es la ruta de acceso del archivo de entrada.DEIDENTIFY_TEMPLATE
: Es el número de plantilla de Cloud DLPDeidentify.DATASET_NAME
: Es el nombre del conjunto de datos de BigQuery.INSPECT_TEMPLATE_NUMBER
: Es el número de plantilla de Cloud DLPInspect.BATCH_SIZE_VALUE
: Es el tamaño del lote (número de filas por API para csv).
Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub (transmisión)
La plantilla Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub es una canalización de transmisión que lee mensajes de Pub/Sub con datos de modificación desde una base de datos de MySQL y escribe los registros en BigQuery. Un conector Debezium captura las modificaciones en la base de datos de MySQL y publica los datos modificados en Pub/Sub. Luego, la plantilla lee los mensajes de Pub/Sub y los escribe en BigQuery.
Puedes usar esta plantilla para sincronizar las bases de datos de MySQL y las tablas de BigQuery. La canalización escribe los datos modificados en una tabla de etapa de pruebas de BigQuery y actualiza de forma intermitente una tabla de BigQuery que replica la base de datos de MySQL.
Requisitos para esta canalización:
- El conector Debezium debe implementarse.
- Los mensajes de Pub/Sub deben serializarse en una Row de Beam.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputSubscriptions |
La lista separada por comas de las suscripciones de entrada de Pub/Sub desde la que se va a leer, en el formato <subscription>,<subscription>, ... |
changeLogDataset |
El conjunto de datos de BigQuery para almacenar las tablas de etapa de pruebas, en el formato <my-dataset> |
replicaDataset |
La ubicación del conjunto de datos de BigQuery para almacenar las tablas de réplica, en el formato <my-dataset> |
updateFrequencySecs |
(Opcional) el intervalo en el que la canalización actualiza la tabla de BigQuery y replica la base de datos de MySQL. |
Ejecuta la plantilla de Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub
Para ejecutar esta plantilla, sigue estos pasos:
- En tu máquina local, clona el repositorio DataflowTemplates.
- Cambia al directorio
v2/cdc-parent
. - Asegúrate de que el conector Debezium esté implementado.
- Mediante Maven, ejecuta la plantilla de Dataflow:
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=CHANGELOG_DATASET \ --replicaDataset=REPLICA_DATASET \ --project=PROJECT_ID \ --region=REGION_NAME"
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.SUBSCRIPTIONS
: la lista separada por comas de los nombres de suscripción a Pub/SubCHANGELOG_DATASET
: el conjunto de datos de BigQuery para los datos de registro de cambiosREPLICA_DATASET
: el conjunto de datos de BigQuery para tablas de réplica
Apache Kafka a BigQuery
La plantilla de Apache Kafka a BigQuery es una canalización de transmisión que transfiere datos de texto de Apache Kafka, ejecuta una función definida por el usuario (UDF) y envía los registros resultantes a BigQuery. Cualquier error que ocurra en la transformación de los datos, la ejecución de la UDF o la inserción en la tabla de resultados se inserta en una tabla de errores independiente en BigQuery. Si la tabla de errores no existe antes de la ejecución, se creará.
Requisitos para esta canalización
- La tabla de salida de BigQuery debe existir.
- El servidor del agente de Apache Kafka debe estar en ejecución y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.
- Los temas de Apache Kafka deben existir, y los mensajes deben estar codificados en un formato JSON válido.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
outputTableSpec |
La ubicación de la tabla de salida de BigQuery en la que se deben escribir los mensajes de Apache Kafka, en el formato my-project:dataset.table . |
inputTopics |
Los temas de entrada de Apache Kafka desde los que se debe leer en una lista separada por comas. Por ejemplo: messages . |
bootstrapServers |
La dirección del host de los servidores del agente de Apache Kafka en ejecución en una lista separada por comas, cada dirección de host tiene el formato 35.70.252.199:9092 . |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
outputDeadletterTable |
(Opcional) la tabla de BigQuery para los mensajes que no llegaron a la tabla de salida, en el formato my-project:dataset.my-deadletter-table . Si no existe, la tabla se crea durante la ejecución de la canalización.
Si no se especifica, se usa <outputTableSpec>_error_records en su lugar. |
Ejecuta la plantilla de Apache Kafka a BigQuery
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Kafka to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
BIGQUERY_TABLE
: Es el nombre de la tabla de BigQuery.KAFKA_TOPICS
: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.PATH_TO_JAVASCRIPT_UDF_FILE
: El URI de Cloud Storage de.js
archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.KAFKA_SERVER_ADDRESSES
: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo:35.70.252.199:9092
. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
BIGQUERY_TABLE
: Es el nombre de la tabla de BigQuery.KAFKA_TOPICS
: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.PATH_TO_JAVASCRIPT_UDF_FILE
: El URI de Cloud Storage de.js
archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.Por ejemplo, si el código de tu función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.KAFKA_SERVER_ADDRESSES
: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo:35.70.252.199:9092
. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.
Para obtener más información, consulta Escribe datos de Kafka en BigQuery con Dataflow.
Datastream a BigQuery (transmisión)
La plantilla de Datastream a BigQuery es una canalización de transmisión que lee datos de Datastream y los replica en BigQuery. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en una tabla de etapa de pruebas de BigQuery particionada por tiempo. Después de la replicación, la plantilla ejecuta una MERGE
en BigQuery para actualizar todos los cambios de captura de datos modificados (CDC) en una réplica de la tabla de origen.
La plantilla controla la creación y la actualización de las tablas de BigQuery que administra la replicación. Cuando se requiere un lenguaje de definición de datos (DDL), una devolución de llamada a Datastream extrae el esquema de la tabla de origen y lo traduce a los tipos de datos de BigQuery. Las operaciones admitidas incluyen las siguientes:
- Las tablas nuevas se crean a medida que se insertan los datos.
- Se agregan columnas nuevas a las tablas de BigQuery con valores iniciales nulos.
- Las columnas descartadas se ignoran en BigQuery y los valores futuros son nulos.
- Las columnas cuyos nombres se han cambiado se agregan a BigQuery como columnas nuevas.
- Los cambios de tipo no se propagan a BigQuery.
Requisitos para esta canalización:
- Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
- Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
- Se crean los conjuntos de datos de destino de BigQuery y se les otorgó acceso de administrador a la cuenta de servicio de Compute Engine.
- En la tabla de origen, se necesita una clave primaria para crear la réplica de destino.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputFilePattern |
La ubicación de los archivos de Datastream en Cloud Storage para replicar. Esta ubicación suele ser la ruta de acceso raíz de la transmisión. |
gcsPubSubSubscription |
La suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo, projects/my-project-id/subscriptions/my-subscription-id . |
inputFileFormat |
El formato del archivo de salida que produce Datastream. Por ejemplo: avro,json . Valor predeterminado, avro . |
outputStagingDatasetTemplate |
El nombre de un conjunto de datos existente para contener tablas de etapa de pruebas. Puedes incluir la plantilla {_metadata_dataset} como marcador de posición que se reemplazará por el nombre del conjunto de datos o esquema de origen (p. ej., {_metadata_dataset}_log ). |
outputDatasetTemplate |
El nombre de un conjunto de datos existente que contiene tablas de réplica. Puedes incluir la plantilla {_metadata_dataset} como marcador de posición que se reemplazará por el nombre del conjunto de datos o esquema de origen (p. ej., {_metadata_dataset} ). |
deadLetterQueueDirectory |
La ruta de acceso del archivo para almacenar los mensajes no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones. |
outputStagingTableNameTemplate |
La plantilla para el nombre de las tablas de etapa de pruebas (opcional). El valor predeterminado es {_metadata_table}_log. Si replicas varios esquemas, la sugerencia es {_metadata_schema}_{_metadata_table}_log . |
outputTableNameTemplate |
La plantilla para el nombre de las tablas de réplica (opcional). Valor predeterminado, {_metadata_table} . Si replicas varios esquemas, la sugerencia es {_metadata_schema}_{_metadata_table} . |
outputProjectId |
Proyecto para los conjuntos de datos de BigQuery en los que se deben generar datos (opcional). El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow. |
streamName |
El nombre o la plantilla del flujo que se consultará para obtener la información del esquema (opcional). Valor predeterminado, {_metadata_stream} . |
mergeFrequencyMinutes |
La cantidad de minutos entre combinaciones para una tabla determinada (opcional). Valor predeterminado, 5. |
dlqRetryMinutes |
La cantidad de minutos entre reintentos de la cola de mensajes no entregados (DLQ) (opcional). Valor predeterminado, 10. |
javascriptTextTransformGcsPath |
El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional).
Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ } , el nombre de la función es myTransform . Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
|
Ejecuta la plantilla de Datastream a BigQuery
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Datastream to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: es el nombre de tu conjunto de datos de BigQuery.BIGQUERY_TABLE
: es la plantilla de tabla de BigQuery. Por ejemplo,{_metadata_schema}_{_metadata_table}_log
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: es el nombre de tu conjunto de datos de BigQuery.BIGQUERY_TABLE
: es la plantilla de tabla de BigQuery. Por ejemplo,{_metadata_schema}_{_metadata_table}_log
Transmisión de datos a MySQL o PostgreSQL (Transmisión)
La plantilla de transmisión de datos a SQL es una canalización de transmisión que lee datos de Datastream y los replica en cualquier base de datos de MySQL o PostgreSQL. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en tablas de réplica de PostgreSQL.
La plantilla no es compatible con el lenguaje de definición de datos (DDL) y espera que todas las tablas ya existan en la base de datos. La replicación usa transformaciones con estado de Dataflow para filtrar los datos inactivos y garantizar la coherencia dentro de los datos desordenados. Por ejemplo, si ya se pasó una versión más reciente de una fila, se ignorará una versión tardía de esa fila. El lenguaje de manipulación de datos (DML) que se ejecuta es el mejor intento de replicar perfectamente los datos de origen o destino. Las declaraciones DML ejecutadas siguen las siguientes reglas:
- Si existe una clave primaria, las operaciones de inserción y actualización usan una sintaxis de upsert (es decir,
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE
). - Si las claves primarias existen, las eliminaciones se replican como un DML borrado.
- Si no existe una clave primaria, se insertan las operaciones de inserción y actualización en la tabla.
- Si no existen claves primarias, se ignoran las eliminaciones.
Si usas las utilidades de Oracle para Postgres, agrega ROWID
en SQL como la clave primaria cuando no exista ninguna.
Los requisitos de esta canalización son los siguientes:
- Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
- Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
- Se propagó una base de datos de PostgreSQL con el esquema requerido.
- Se configura el acceso a la red entre los trabajadores de Dataflow y PostgreSQL.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
inputFilePattern |
La ubicación de los archivos de Datastream en Cloud Storage para replicar. Esta ubicación suele ser la ruta de acceso raíz de la transmisión. |
gcsPubSubSubscription |
La suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo, projects/my-project-id/subscriptions/my-subscription-id . |
inputFileFormat |
El formato del archivo de salida que produce Datastream. Por ejemplo: avro,json . Valor predeterminado, avro . |
databaseHost |
El host de SQL para conectarte. |
databaseUser |
El usuario de SQL con todos los permisos necesarios para escribir en todas las tablas en la replicación. |
databasePassword |
La contraseña para el usuario de SQL especificado. |
databasePort |
(Opcional) El puerto de la base de datos de SQL al que se realizará la conexión. Valor predeterminado, 5432. |
databaseName |
(Opcional) El nombre de la base de datos de SQL a la que se realizará la conexión. Valor predeterminado, postgres. |
streamName |
El nombre o la plantilla del flujo que se consultará para obtener la información del esquema (opcional). Valor predeterminado, {_metadata_stream} . |
Ejecuta la plantilla de transmisión de datos a SQL
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Datastream to SQL template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: Es la IP del host de SQL.DATABASE_USER
: Es tu usuario de SQL.DATABASE_PASSWORD
: Es tu contraseña de SQL.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: Es la IP del host de SQL.DATABASE_USER
: Es tu usuario de SQL.DATABASE_PASSWORD
: Es tu contraseña de SQL.
Conectividad a bases de datos de Java (JDBC) de Pub/Sub
La plantilla de conectividad de base de datos de Pub/Sub a Java (JDBC) es una canalización de transmisión que transfiere datos desde una suscripción de Cloud Pub/Sub preexistente como strings JSON y escribe los registros resultantes en JDBC.
Requisitos para esta canalización:
- La suscripción de Cloud Pub/Sub de origen debe existir antes de ejecutar la canalización.
- La fuente de JDBC debe existir antes de ejecutar la canalización.
- El tema no entregado de Cloud Pub/Sub debe existir antes de ejecutar la canalización.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
driverClassName |
El nombre de la clase del controlador de JDBC. Por ejemplo, com.mysql.jdbc.Driver |
connectionUrl |
La string de la URL de la conexión de JDBC. Por ejemplo, jdbc:mysql://some-host:3306/sampledb Se puede pasar como una string codificada en Base64 y, luego, encriptada con una clave de Cloud KMS. |
driverJars |
Rutas de Cloud Storage separadas por comas para controladores de JDBC. Por ejemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar . |
username |
El nombre de usuario para usar en la conexión de JDBC (opcional). Se puede pasar como una string codificada en Base64 encriptada con una clave de Cloud KMS. |
password |
La contraseña para usar en la conexión de JDBC (opcional). Se puede pasar como una string codificada en Base64 encriptada con una clave de Cloud KMS. |
connectionProperties |
La string de propiedades para usar en la conexión de JDBC (opcional). El formato de la string debe ser [propertyName=property;]* . Por ejemplo, unicode=true;characterEncoding=UTF-8 |
statement |
Declaración que se ejecutará en la base de datos. La instrucción debe especificar los nombres de columna de la tabla en cualquier orden. Solo los valores de los nombres de columna especificados se leen del JSON y se agregan a la instrucción. Por ejemplo, INSERT INTO tableName (column1, column2) VALUES (?,?) |
inputSubscription |
Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription> . |
outputDeadletterTopic |
El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name> . |
KMSEncryptionKey |
La clave de encriptación de Cloud KMS para desencriptar el nombre de usuario, la contraseña y la string de conexión (opcional). Si se pasa la clave de Cloud KMS, el nombre de usuario, la contraseña y la string de conexión deben pasarse encriptados. |
extraFilesToStage |
Rutas de Cloud Storage separadas por comas o secretos de Secret Manager para los archivos que se deben almacenar en etapa intermedia en el trabajador. Estos archivos se guardarán en el directorio /extra_files de cada trabajador. Por ejemplo, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id> |
Ejecuta la plantilla de Pub/Sub a la base de datos de Java (JDBC)
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to JDBC template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \ --region REGION_NAME \ --parameters \ driverClassName=DRIVER_CLASS_NAME,\ connectionURL=JDBC_CONNECTION_URL,\ driverJars=DRIVER_PATHS,\ username=CONNECTION_USERNAME,\ password=CONNECTION_PASSWORD,\ connectionProperties=CONNECTION_PROPERTIES,\ statement=SQL_STATEMENT,\ inputSubscription=INPUT_SUBSCRIPTION,\ outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\ KMSEncryptionKey=KMS_ENCRYPTION_KEY
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
REGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
DRIVER_CLASS_NAME
: Es el nombre de la clase del controlador.JDBC_CONNECTION_URL
: Es la URL de conexión de JDBC.DRIVER_PATHS
: Son las rutas de Cloud Storage separadas por comas de los controladores JDBC.CONNECTION_USERNAME
: Es el nombre de usuario de la conexión de JDBC.CONNECTION_PASSWORD
: Es la contraseña de la conexión de JDBC.CONNECTION_PROPERTIES
: Las propiedades de conexión de JDBC, si es necesarioSQL_STATEMENT
: Es la instrucción de SQL que se ejecutará en la base de datos.INPUT_SUBSCRIPTION
: Es la suscripción de entrada de Pub/Sub desde la que se desea leer.OUTPUT_DEADLETTER_TOPIC
: Es el tema de Pub/Sub para reenviar mensajes que no se pueden entregar.KMS_ENCRYPTION_KEY
: Es la clave de encriptación de Cloud KMS.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc { "jobName": "JOB_NAME", "parameters": { "driverClassName": "DRIVER_CLASS_NAME", "connectionURL": "JDBC_CONNECTION_URL", "driverJars": "DRIVER_PATHS", "username": "CONNECTION_USERNAME", "password": "CONNECTION_PASSWORD", "connectionProperties": "CONNECTION_PROPERTIES", "statement": "SQL_STATEMENT", "inputSubscription": "INPUT_SUBSCRIPTION", "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC", "KMSEncryptionKey":"KMS_ENCRYPTION_KEY" }, "environment": { "zone": "us-central1-f" }, }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
LOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
DRIVER_CLASS_NAME
: Es el nombre de la clase del controlador.JDBC_CONNECTION_URL
: Es la URL de conexión de JDBC.DRIVER_PATHS
: Son las rutas de Cloud Storage separadas por comas de los controladores JDBC.CONNECTION_USERNAME
: Es el nombre de usuario de la conexión de JDBC.CONNECTION_PASSWORD
: Es la contraseña de la conexión de JDBC.CONNECTION_PROPERTIES
: Las propiedades de conexión de JDBC, si es necesarioSQL_STATEMENT
: Es la instrucción de SQL que se ejecutará en la base de datos.INPUT_SUBSCRIPTION
: Es la suscripción de entrada de Pub/Sub desde la que se desea leer.OUTPUT_DEADLETTER_TOPIC
: Es el tema de Pub/Sub para reenviar mensajes que no se pueden entregar.KMS_ENCRYPTION_KEY
: Es la clave de encriptación de Cloud KMS.
Flujos de cambio de Cloud Spanner a Cloud Storage
La plantilla de flujos de cambios de Cloud Spanner a Cloud Storage es una canalización de transmisión que transmite los registros de cambios de datos de Spanner y los escribe en un bucket de Cloud Storage mediante Dataflow Runner v2.
La canalización agrupa los registros de flujos de cambios de Spanner en períodos según su marca de tiempo, y cada período representa un intervalo de tiempo cuya duración puedes configurar con esta plantilla. Se garantiza que todos los registros con marcas de tiempo que pertenecen al período se encuentran en el período; no puede haber retrasos. También puedes definir una cantidad de fragmentos de salida. La canalización crea un archivo de salida de Cloud Storage por período por fragmento. Dentro de un archivo de salida, los registros no están ordenados. Se pueden escribir los archivos de salida en formato JSON o AVRO, según la configuración del usuario.
Ten en cuenta que puedes minimizar la latencia de la red y los costos de transporte de la red si ejecutas el trabajo de Dataflow desde la misma región que tu instancia de Cloud Spanner o tu bucket de Cloud Storage. Si usas fuentes, receptores, ubicaciones de archivos de etapa de pruebas o ubicaciones de archivos temporales que se encuentran fuera de la región del trabajo, es posible que los datos se envíen a través de diferentes regiones. Obtén más información sobre los extremos regionales de Dataflow.
Obtén más información sobre los flujos de cambios, cómo compilar canalizaciones de Dataflow de flujos de cambio y prácticas recomendadas.
Requisitos para esta canalización:
- La instancia de Cloud Spanner debe existir antes de ejecutar la canalización.
- La base de datos de Cloud Spanner debe existir antes de ejecutar la canalización.
- La instancia de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
- La base de datos de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
- El flujo de cambios de Cloud Spanner debe existir antes de ejecutar la canalización.
- El bucket de salida de Cloud Storage debe existir antes de ejecutar la canalización.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
spannerInstanceId |
El ID de instancia de Cloud Spanner desde el que se leerán los datos de flujos de cambios. |
spannerDatabase |
La base de datos de Cloud Spanner desde la que se leerán los datos de flujos de cambios. |
spannerMetadataInstanceId |
El ID de instancia de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios. |
spannerMetadataDatabase |
La base de datos de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios. |
spannerChangeStreamName |
El nombre del flujo de cambios de Cloud Spanner desde el que se leerá. |
gcsOutputDirectory |
La ubicación del archivo de salida de los flujos de cambios en Cloud Storage en el siguiente formato: '“s://${BUCKET}/${ROOT_PATH}/”. |
outputFilenamePrefix |
(Opcional). El prefijo de nombre de archivo de los archivos en los que se escribirá. El prefijo de archivo predeterminado está configurado como “output”. |
spannerProjectId |
(Opcional). Proyecto desde el que se leerán los flujos de cambio. Este es también el proyecto en el que se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow. |
startTimestamp |
(Opcional). La fecha y hora de inicio, inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo del inicio de la canalización, es decir, la hora actual. |
endTimestamp |
(Opcional). La fecha y hora de finalización, inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro. |
outputFileFormat |
(Opcional). El formato del archivo de salida de Cloud Storage. Los formatos permitidos son TEXT y AVRO. El valor predeterminado es AVRO. |
windowDuration |
La duración de la ventana es el intervalo en el que se escriben los datos en el directorio de salida (opcional). Configura la duración en función de la capacidad de procesamiento de la canalización. Por ejemplo, una capacidad de procesamiento mayor puede requerir tamaños de ventana más pequeños para que los datos se ajusten a la memoria. La configuración predeterminada es de 5 min, con un mínimo de 1 s. Los formatos permitidos son: [nro. entero] s (para los segundos, por ejemplo, 5 s), [nro. entero] min (para los minutos, por ejemplo, 12 min) y [nro. entero] h (para las horas, por ejemplo, 2 h). |
rpcPriority |
(Opcional). La prioridad de solicitud para llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH, MEDIUM, LOW]. (Predeterminado: HIGH) |
numShards |
(Opcional). La cantidad máxima de fragmentos de salida que se produce con la escritura. La cantidad predeterminada es 20. Una mayor cantidad de fragmentos implica una mayor capacidad de procesamiento para la escritura en Cloud Storage, pero, también, un mayor costo de agregación de datos entre fragmentos cuando se procesan archivos de salida de Cloud Storage. |
spannerMetadataTableName |
(Opcional). El nombre de la tabla de metadatos del conector de flujos de cambios de Cloud Spanner que se usará. Si no se proporciona, se creará una tabla de metadatos de flujos de cambios de Cloud Spanner automáticamente durante el flujo de canalización. Este parámetro se debe proporcionar cuando se actualiza una canalización existente y no se debe proporcionar de otra manera. |
Ejecuta la plantilla de flujos de cambios de Cloud Spanner a Cloud Storage
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to Google Cloud Storage template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ gcsOutputDirectory=GCS_OUTPUT_DIRECTORY
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
REGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
SPANNER_INSTANCE_ID
: ID de la instancia de Cloud SpannerSPANNER_DATABASE
: Base de datos de Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID de la instancia de metadatos de Cloud SpannerSPANNER_METADATA_DATABASE
: Base de datos de metadatos de Cloud SpannerSPANNER_CHANGE_STREAM
: Flujo de cambios de Cloud SpannerGCS_OUTPUT_DIRECTORY
: Ubicación del archivo de salida de los flujos de cambios
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
LOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
SPANNER_INSTANCE_ID
: ID de la instancia de Cloud SpannerSPANNER_DATABASE
: Base de datos de Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID de la instancia de metadatos de Cloud SpannerSPANNER_METADATA_DATABASE
: Base de datos de metadatos de Cloud SpannerSPANNER_CHANGE_STREAM
: Flujo de cambios de Cloud SpannerGCS_OUTPUT_DIRECTORY
: Ubicación del archivo de salida de los flujos de cambios
Flujos de cambio de Cloud Spanner a BigQuery
La plantilla de flujos de cambios de Cloud Spanner a BigQuery es una canalización de transmisión que transmite los registros de cambios de datos de Cloud Spanner y los escribe en tablas de BigQuery mediante Dataflow Runner v2.
Si las tablas de BigQuery necesarias no existen, la canalización las crea. De lo contrario, se usan las tablas de BigQuery existentes. El esquema de las tablas de BigQuery existentes debe contener las columnas con seguimiento correspondientes de las tablas de Cloud Spanner y las columnas de metadatos adicionales (consulta la descripción de los campos de metadatos en la siguiente lista) que la opción “ignoreFields” no ignora de manera explícita. Cada fila nueva de BigQuery incluye todas las columnas observadas por el flujo de cambios de su fila correspondiente en tu tabla de Cloud Spanner en la marca de tiempo del registro de cambios.
Todas las columnas observadas por el flujo de cambios se incluyen en cada fila de la tabla de BigQuery, sin importar si una transacción de Cloud Spanner las modifica. Las columnas que no se observan no se incluyen en la fila de BigQuery. Cualquier cambio de Cloud Spanner que sea menor que la marca de agua de Dataflow se aplica de forma correcta a las tablas de BigQuery o se almacena en la cola de mensajes no entregados para reintentar aplicarlo. Las filas de BigQuery se insertan de forma desordenada en comparación con el orden original de las marcas de tiempo de confirmación de Cloud Spanner.
Los siguientes campos de metadatos se agregan a las tablas de BigQuery:
- _metadata_spanner_mod_type: Se extrae del registro de cambios de los datos de flujos de cambios.
- _metadata_spanner_table_name: Es el nombre de la tabla de Cloud Spanner. Ten en cuenta que este no es el nombre de la tabla de metadatos del conector.
- _metadata_spanner_commit_timestamp: Se extrae del registro de cambios de los datos de flujos de cambios.
- _metadata_spanner_server_transaction_id: Se extrae del registro de cambios de los datos de flujos de cambios.
- _metadata_spanner_record_sequence: Se extrae del registro de cambios de los datos de flujos de cambios.
- _metadata_spanner_is_last_record_in_transaction_in_partition: Se extrae del registro de cambios de los datos de flujos de cambios.
- _metadata_spanner_number_of_records_in_transaction: Se extrae del registro de cambios de los datos de flujos de cambios.
- _metadata_spanner_number_of_partitions_in_transaction: Se extrae del registro de cambios de los datos de flujos de cambios.
- _metadata_big_query_commit_timestamp: La marca de tiempo de confirmación del momento en que se inserta la fila en BigQuery.
Nota:
- Esta plantilla no propaga los cambios de esquema de Cloud Spanner a BigQuery. Debido a que realizar un cambio de esquema en Cloud Spanner probablemente dañe la canalización, es posible que debas volver a crear la canalización después del cambio de esquema.
- Para los tipos de captura de valor
OLD_AND_NEW_VALUES
yNEW_VALUES
, cuando el registro de cambios de datos contiene un cambio UPDATE, la plantilla debe realizar una lectura inactiva en Cloud Spanner en la marca de tiempo de confirmación del registro de cambios de datos a fin de recuperar las columnas que sí se observaron, pero no se modificaron. Asegúrate de configurar la base de datos “'version_retention_period” de forma adecuada para la lectura inactiva. Para el tipo de captura de valorNEW_ROW
, la plantilla es más eficiente, ya que el registro de cambios de datos captura la fila nueva completa, incluidas las columnas que no se actualizan en UPDATE y la plantilla no necesita llevar a cabo una lectura inactiva. - Para minimizar la latencia de la red y los costos de transporte de la red, puedes ejecutar el trabajo de Dataflow desde la misma región que tu instancia de Cloud Spanner o tus tablas de BigQuery. Si usas fuentes, receptores, ubicaciones de archivos de etapa de pruebas o ubicaciones de archivos temporales que se encuentran fuera de la región del trabajo, es posible que los datos se envíen a través de diferentes regiones. Obtén más información sobre los extremos regionales de Dataflow.
- Esta plantilla admite todos los tipos de datos válidos de Cloud Spanner, pero si el tipo de BigQuery es más preciso que el tipo de Cloud Spanner, podría producirse una pérdida precisión durante la transformación. Específicamente:
- Para el tipo JSON de Cloud Spanner, el orden de los miembros de un objeto se ordena de forma lexicográfica, pero no existe esa garantía para el tipo JSON de BigQuery.
- Cloud Spanner admite el tipo de TIMESTAMP de nanosegundos, BigQuery solo admite el tipo de TIMESTAMP de microsegundos.
Obtén más información sobre los flujos de cambios, cómo compilar canalizaciones de Dataflow de flujos de cambio y prácticas recomendadas.
Requisitos para esta canalización:
- La instancia de Cloud Spanner debe existir antes de ejecutar la canalización.
- La base de datos de Cloud Spanner debe existir antes de ejecutar la canalización.
- La instancia de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
- La base de datos de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
- El flujo de cambios de Cloud Spanner debe existir antes de ejecutar la canalización.
- El conjunto de datos de BigQuery debe existir antes de ejecutar la canalización.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
spannerInstanceId |
La instancia de Cloud Spanner desde la que se leerán los flujos de cambio. |
spannerDatabase |
La base de datos de Cloud Spanner desde la que se leerán los flujos de cambio. |
spannerMetadataInstanceId |
La instancia de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios. |
spannerMetadataDatabase |
La base de datos de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios. |
spannerChangeStreamName |
El nombre del flujo de cambios de Cloud Spanner desde el que se leerá. |
bigQueryDataSet |
El conjunto de datos de BigQuery de salida de los flujos de cambios. |
spannerProjectId |
(Opcional). Proyecto desde el que se leerán los flujos de cambio. Este es también el proyecto en el que se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow. |
spannerMetadataTableName |
(Opcional). El nombre de la tabla de metadatos del conector de flujos de cambios de Cloud Spanner que se usará. Si no se proporciona, una tabla de metadatos de conectores de transmisión de cambios de Cloud Spanner se crea de forma automática durante el flujo de la canalización. Este parámetro se debe proporcionar cuando se actualiza una canalización existente y no se debe proporcionar de otra manera. |
rpcPriority |
(Opcional). La prioridad de solicitud para llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH, MEDIUM, LOW]. (Predeterminado: HIGH) |
startTimestamp |
(Opcional). La fecha y hora de inicio, inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo del inicio de la canalización, es decir, la hora actual. |
endTimestamp |
(Opcional). La fecha y hora de finalización, inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro. |
bigQueryProjectId |
(Opcional). El proyecto de BigQuery. El valor predeterminado es el proyecto para el trabajo de Dataflow. |
bigQueryChangelogTableNameTemplate |
(Opcional). La plantilla para el nombre de las tablas de registros de cambios de BigQuery. El valor predeterminado es {_metadata_spanner_table_name}_changelog. |
deadLetterQueueDirectory |
(Opcional). La ruta de acceso del archivo que almacenará los registros no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones. |
dlqRetryMinutes |
(Opcional). La cantidad de minutos entre reintentos de la cola de mensajes no entregados. La configuración predeterminada es 10. |
ignoreFields |
(Opcional) Lista de campos separados por comas (con distinción entre mayúsculas y minúsculas) que se deben ignorar. Pueden ser campos de tablas observadas o campos de metadatos que agrega la canalización. Los campos ignorados no se insertarán en BigQuery. |
Ejecuta la plantilla de flujos de cambios de Cloud Spanner a BigQuery
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to BigQuery template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
REGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
SPANNER_INSTANCE_ID
: ID de la instancia de Cloud SpannerSPANNER_DATABASE
: Base de datos de Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID de la instancia de metadatos de Cloud SpannerSPANNER_METADATA_DATABASE
: Base de datos de metadatos de Cloud SpannerSPANNER_CHANGE_STREAM
: Flujo de cambios de Cloud SpannerBIGQUERY_DATASET
: El conjunto de datos de BigQuery de salida de los flujos de cambios.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
LOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
SPANNER_INSTANCE_ID
: ID de la instancia de Cloud SpannerSPANNER_DATABASE
: Base de datos de Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID de la instancia de metadatos de Cloud SpannerSPANNER_METADATA_DATABASE
: Base de datos de metadatos de Cloud SpannerSPANNER_CHANGE_STREAM
: Flujo de cambios de Cloud SpannerBIGQUERY_DATASET
: El conjunto de datos de BigQuery de salida de los flujos de cambios.
Flujos de cambio de Cloud Spanner a Pub/Sub
La transmisión de cambios de Cloud Spanner a la plantilla de Pub/Sub es una canalización de transmisión en la que se transmiten los registros de cambios de datos de Cloud Spanner y se los escribe en temas de Pub/Sub mediante Dataflow Runner V2.
Para enviar tus datos a un tema nuevo de Pub/Sub, primero debes crear el tema. Después de crearlo, Pub/Sub genera y adjunta automáticamente una suscripción al tema nuevo. Si intentas generar datos en un tema de Pub/Sub inexistente, la canalización de Dataflow mostrará una excepción y esta se detendrá a medida que intente establecer una conexión de forma continua.
Si el tema de Pub/Sub necesario ya existe, puedes generar datos en él.
Para obtener más información, consulta Acerca de los flujos de cambios, Compila conexiones de flujos de cambios con Dataflow y Prácticas recomendadas de flujos de cambio.
Requisitos para esta canalización:
- La instancia de Cloud Spanner debe existir antes de ejecutar la canalización.
- La base de datos de Cloud Spanner debe existir antes de ejecutar la canalización.
- La instancia de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
- La base de datos de metadatos de Cloud Spanner debe existir antes de ejecutar la canalización.
- El flujo de cambios de Cloud Spanner debe existir antes de ejecutar la canalización.
- El tema de Pub/Sub debe existir antes de ejecutar la canalización.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
spannerInstanceId |
La instancia de Cloud Spanner desde la que se leerán los flujos de cambio. |
spannerDatabase |
La base de datos de Cloud Spanner desde la que se leerán los flujos de cambio. |
spannerMetadataInstanceId |
La instancia de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios. |
spannerMetadataDatabase |
La base de datos de Cloud Spanner que se usará para la tabla de metadatos del conector de flujos de cambios. |
spannerChangeStreamName |
El nombre del flujo de cambios de Cloud Spanner desde el que se leerá. |
pubsubTopic |
El tema de Pub/Sub para el resultado de los flujos de cambios. |
spannerProjectId |
(Opcional) El proyecto desde el que se leerán los flujos de cambio. Este es también el proyecto en el que se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow. |
spannerMetadataTableName |
(Opcional). El nombre de la tabla de metadatos del conector de flujos de cambios de Cloud Spanner que se usará. Si no se proporciona la tabla de metadatos del conector de transmisión durante el cambio de flujo de canalización, Cloud Spanner la crea automáticamente. Debes proporcionar este parámetro cuando actualices una canalización existente. No uses este parámetro para otros casos. |
rpcPriority |
(Opcional). La prioridad de solicitud para llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH, MEDIUM, LOW]. (Predeterminado: HIGH) |
startTimestamp |
(Opcional). La fecha y hora de inicio, inclusiva, que se usará para leer los flujos de cambios. Por ejemplo, ex-2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo del inicio de la canalización, es decir, la hora actual. |
endTimestamp |
(Opcional). La fecha y hora de finalización, inclusiva, que se usará para leer los flujos de cambios. Por ejemplo, ex-2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro. |
outputFileFormat |
(Opcional) El formato del resultado. El resultado se une a muchos PubsubMessages y se envía a un tema de Pub/Sub. Los formatos permitidos son JSON y AVRO. El valor predeterminado es JSON. |
pubsubAPI |
(Opcional) La API de Pub/Sub que se usa para implementar la canalización. Las APIs permitidas son pubsubio y native_client . Para una pequeña cantidad de consultas por segundo (QPS), native_client tiene menos latencia. Para una gran cantidad de QPS, pubsubio proporciona un rendimiento mejor y más estable. El valor predeterminado es pubsubio . |
Ejecuta los flujos de cambios de Cloud Spanner en la plantilla de Pub/Sub
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to Pub/Sub template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ pubsubTopic=PUBSUB_TOPIC
Reemplaza lo siguiente:
JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
REGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
SPANNER_INSTANCE_ID
: ID de la instancia de Cloud SpannerSPANNER_DATABASE
: Base de datos de Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID de la instancia de metadatos de Cloud SpannerSPANNER_METADATA_DATABASE
: Base de datos de metadatos de Cloud SpannerSPANNER_CHANGE_STREAM
: Flujo de cambios de Cloud SpannerPUBSUB_TOPIC
: Es el tema de Pub/Sub para la salida de los flujos de cambios.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "pubsubTopic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasVERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
LOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
SPANNER_INSTANCE_ID
: ID de la instancia de Cloud SpannerSPANNER_DATABASE
: Base de datos de Cloud SpannerSPANNER_METADATA_INSTANCE_ID
: ID de la instancia de metadatos de Cloud SpannerSPANNER_METADATA_DATABASE
: Base de datos de metadatos de Cloud SpannerSPANNER_CHANGE_STREAM
: Flujo de cambios de Cloud SpannerPUBSUB_TOPIC
: Es el tema de Pub/Sub para la salida de los flujos de cambios.
De MongoDB a BigQuery (CDC)
La plantilla de datos de MongoDB a BigQuery CDC (captura de datos modificados) es una canalización de transmisión que funciona junto con los flujos de cambios de MongoDB.
La canalización lee los registros JSON enviados a Pub/Sub a través de un flujo de cambios de MongoDB y los escribe en BigQuery según lo especificado por el parámetro userOption
.
Requisitos para esta canalización
- El conjunto de datos de destino de BigQuery debe existir.
- Se debe poder acceder a la instancia de origen de MongoDB desde las máquinas de trabajador de Dataflow.
- Los flujos de cambios que envían los cambios de MongoDB a Pub/Sub debería ejecutarse.
Parámetros de la plantilla
Parámetro | Descripción |
---|---|
mongoDbUri |
Es el URI de conexión de MongoDB con el formato mongodb+srv://:@ . |
database |
La base de datos en MongoDB en la que se debe leer la colección. Por ejemplo: my-db . |
collection |
Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection . |
outputTableSpec |
La tabla de BigQuery en la que se escribirá. Por ejemplo, bigquery-project:dataset.output_table . |
userOption |
FLATTEN o NONE . FLATTEN aplana los documentos al primer nivel. NONE almacena todo el documento como una string JSON. |
inputTopic |
El tema de entrada de Pub/Sub desde el que se va a leer, en el formato projects/<project>/topics/<topic> . |
Ejecuta la plantilla de MongoDB a BigQuery (CDC)
Consola
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. El extremo regional predeterminado es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the MongoDB to BigQuery (CDC) template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
OUTPUT_TABLE_SPEC
: Es el nombre de la tabla de BigQuery de destino.MONGO_DB_URI
: Es el URI de MongoDB.DATABASE
: Es tu base de datos de MongoDB.COLLECTION
: Es tu colección de MongoDB.USER_OPTION
: FLATTEN o NINGUNO.INPUT_TOPIC
: Es el tema de entrada de Pub/Sub.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: El extremo regional en el que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION
: Es la versión de la plantilla que deseas usar.Puedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket: gs://dataflow-templates/latest/- el nombre de la versión, como
2021-09-20-00_RC00
, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket: gs://dataflow-templates/
OUTPUT_TABLE_SPEC
: Es el nombre de la tabla de BigQuery de destino.MONGO_DB_URI
: Es el URI de MongoDB.DATABASE
: Es tu base de datos de MongoDB.COLLECTION
: Es tu colección de MongoDB.USER_OPTION
: FLATTEN o NINGUNO.INPUT_TOPIC
: Es el tema de entrada de Pub/Sub.