La plantilla de Datastream a BigQuery es una canalización de streaming que lee datos de Datastream y los replica en BigQuery. La plantilla lee datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en una tabla de almacenamiento provisional de BigQuery con particiones por tiempo. Después de la replicación, la plantilla ejecuta un MERGE
en BigQuery para insertar o actualizar todos los cambios de la captura de datos de cambios (CDC) en una réplica de la tabla de origen. Especifica el parámetro gcsPubSubSubscription
para leer datos de las notificaciones de Pub/Sub O proporciona el parámetro inputFilePattern
para leer datos directamente de los archivos de Cloud Storage.
La plantilla se encarga de crear y actualizar las tablas de BigQuery gestionadas por la replicación. Cuando se requiere el lenguaje de definición de datos (DDL), una retrollamada a Datastream extrae el esquema de la tabla de origen y lo traduce a tipos de datos de BigQuery. Entre las operaciones admitidas se incluyen las siguientes:
- Las tablas se crean a medida que se insertan los datos.
- Se añaden nuevas columnas a las tablas de BigQuery con valores iniciales nulos.
- Las columnas eliminadas se ignoran en BigQuery y los valores futuros son nulos.
- Las columnas renombradas se añaden a BigQuery como columnas nuevas.
- Los cambios de tipo no se propagan a BigQuery.
Se recomienda ejecutar este flujo de procesamiento con el modo de streaming al menos una vez, ya que la plantilla elimina los duplicados cuando combina los datos de una tabla temporal de BigQuery con la tabla principal de BigQuery. Este paso de la canalización significa que no hay ninguna ventaja adicional en usar el modo de streaming exactamente una vez.
Requisitos del flujo de procesamiento
- Un flujo de Datastream que esté listo para replicar datos o que ya lo esté haciendo.
- Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
- Se crean conjuntos de datos de destino de BigQuery y se concede acceso de administrador a la cuenta de servicio de Compute Engine.
- Es necesario que la tabla de origen tenga una clave principal para que se cree la tabla de réplica de destino.
- Una base de datos de origen MySQL u Oracle. Las bases de datos PostgreSQL y SQL Server no son compatibles.
Parámetros de plantilla
Parámetros obligatorios
- inputFilePattern la ubicación del archivo de salida de Datastream en Cloud Storage, con el formato
gs://<BUCKET_NAME>/<ROOT_PATH>/
. - inputFileFormat formato de los archivos de salida que genera Datastream. Los valores permitidos son
avro
yjson
. El valor predeterminado esavro
. - gcsPubSubSubscription la suscripción de Pub/Sub que usa Cloud Storage para notificar a Dataflow que hay archivos nuevos disponibles para procesar, con el formato
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
. - outputStagingDatasetTemplate nombre del conjunto de datos que contiene las tablas de almacenamiento provisional. Este parámetro admite plantillas, como
{_metadata_dataset}_log
omy_dataset_log
. Normalmente, este parámetro es el nombre de un conjunto de datos. El valor predeterminado es{_metadata_dataset}
. Nota: En el caso de las fuentes de MySQL, el nombre de la base de datos se asigna a{_metadata_schema}
en lugar de a{_metadata_dataset}
. - outputDatasetTemplate nombre del conjunto de datos que contiene las tablas de réplica. Este parámetro admite plantillas, como
{_metadata_dataset}
omy_dataset
. Normalmente, este parámetro es el nombre de un conjunto de datos. El valor predeterminado es{_metadata_dataset}
. Nota: En el caso de las fuentes de MySQL, el nombre de la base de datos se asigna a{_metadata_schema}
en lugar de a{_metadata_dataset}
. - deadLetterQueueDirectory la ruta que usa Dataflow para escribir la salida de la cola de mensajes fallidos. Esta ruta no debe ser la misma que la de la salida del archivo de flujo de datos. El valor predeterminado es
empty
.
Parámetros opcionales
- streamName: el nombre o la plantilla de la emisión de la que se va a obtener la información del esquema. El valor predeterminado es {_metadata_stream}. El valor predeterminado suele ser suficiente.
- rfcStartDateTime la fecha y hora de inicio que se usará para obtener datos de Cloud Storage (https://tools.ietf.org/html/rfc3339). El valor predeterminado es
1970-01-01T00:00:00.00Z
. - fileReadConcurrency número de archivos de DataStream simultáneos que se van a leer. El valor predeterminado es
10
. - outputProjectId el ID del proyecto de Google Cloud que contiene los conjuntos de datos de BigQuery en los que se van a generar los datos. El valor predeterminado de este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
- outputStagingTableNameTemplate la plantilla que se va a usar para asignar nombres a las tablas de almacenamiento provisional. Por ejemplo,
{_metadata_table}
. El valor predeterminado es{_metadata_table}_log
. - outputTableNameTemplate la plantilla que se va a usar para el nombre de las tablas de réplica. Por ejemplo,
{_metadata_table}
. El valor predeterminado es{_metadata_table}
. - ignoreFields campos separados por comas que se deben ignorar en BigQuery. El valor predeterminado es
_metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count
. Por ejemplo,_metadata_stream,_metadata_schema
. - mergeFrequencyMinutes número de minutos entre las combinaciones de una tabla determinada. El valor predeterminado es
5
. - dlqRetryMinutes número de minutos entre reintentos de DLQ. El valor predeterminado es
10
. - dataStreamRootUrl la URL raíz de la API DataStream. El valor predeterminado es https://datastream.googleapis.com/.
- applyMerge indica si se deben inhabilitar las consultas MERGE en el trabajo. El valor predeterminado es
true
. - mergeConcurrency el número de consultas MERGE simultáneas de BigQuery. Solo tiene efecto cuando applyMerge se define como true. El valor predeterminado es
30
. - partitionRetentionDays el número de días que se usará para la conservación de particiones al ejecutar combinaciones de BigQuery. El valor predeterminado es
1
. - useStorageWriteApiAtLeastOnce este parámetro solo tiene efecto si
Use BigQuery Storage Write API
está habilitado. Si estrue
, se usa la semántica de al menos una vez para la API Storage Write. De lo contrario, se usa la semántica de entrega única. El valor predeterminado esfalse
. - javascriptTextTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función de JavaScript es
myTransform(inJson) { /*...do stuff...*/ }
, el nombre de la función esmyTransform
. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes especifica la frecuencia con la que se vuelve a cargar la FDU, en minutos. Si el valor es superior a 0, Dataflow comprueba periódicamente el archivo de la función definida por el usuario en Cloud Storage y vuelve a cargar la función si se modifica el archivo. Este parámetro le permite actualizar la función definida por el usuario mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es
0
, se inhabilita la recarga de las funciones definidas por el usuario. El valor predeterminado es0
. - pythonTextTransformGcsPath patrón de ruta de Cloud Storage del código Python que contiene las funciones definidas por el usuario. Por ejemplo,
gs://your-bucket/your-transforms/*.py
. - pythonRuntimeVersion la versión del tiempo de ejecución que se va a usar en esta función definida por el usuario de Python.
- pythonTextTransformFunctionName el nombre de la función a la que se llamará desde tu archivo JavaScript. Usa solo letras, dígitos y guiones bajos. Por ejemplo,
transform_udf1
. - runtimeRetries el número de veces que se volverá a intentar ejecutar un tiempo de ejecución antes de que falle. El valor predeterminado es 5.
- useStorageWriteApi si es true, la canalización usa la API Storage Write de BigQuery (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es
false
. Para obtener más información, consulta el artículo sobre cómo usar la API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debe definir este parámetro. El valor predeterminado es 0. - storageWriteApiTriggeringFrequencySec cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debe definir este parámetro.
Función definida por el usuario
También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.
Especificación de la función
La función definida por el usuario tiene las siguientes especificaciones:
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Datastream to BigQuery template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Opcional: Para cambiar del procesamiento una sola vez al modo de streaming al menos una vez, selecciona Al menos una vez.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasREGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: la ruta de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: la suscripción de Pub/Sub para leer los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: el nombre del conjunto de datos de BigQuery.BIGQUERY_TABLE
: tu plantilla de tabla de BigQuery. Por ejemplo,{_metadata_schema}_{_metadata_table}_log
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasLOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: la ruta de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: la suscripción de Pub/Sub para leer los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: el nombre del conjunto de datos de BigQuery.BIGQUERY_TABLE
: tu plantilla de tabla de BigQuery. Por ejemplo,{_metadata_schema}_{_metadata_table}_log
Siguientes pasos
- Consulte cómo implementar DataStream y Dataflow para analíticas.
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.