La plantilla de flujos de cambios de Spanner a BigQuery es una canalización de streaming que transmite registros de cambios de datos de Spanner y los escribe en tablas de BigQuery mediante Dataflow Runner V2.
Todas las columnas de flujo de cambios monitorizadas se incluyen en cada fila de la tabla de BigQuery, independientemente de si se modifican mediante una transacción de Spanner. Las columnas no monitorizadas no se incluyen en la fila de BigQuery. Los cambios de Spanner que sean inferiores a la marca de agua de Dataflow se aplican correctamente a las tablas de BigQuery o se almacenan en la cola de mensajes fallidos para volver a intentarlo. Las filas de BigQuery se insertan en un orden diferente al orden de las marcas de tiempo de confirmación originales de Spanner.
Si no existen las tablas de BigQuery necesarias, la canalización las crea. De lo contrario, se usarán las tablas de BigQuery que ya tengas. El esquema de las tablas de BigQuery debe contener las columnas de las tablas de Spanner que se están monitorizando y las columnas de metadatos adicionales que no se hayan ignorado explícitamente con la opción ignoreFields
.
Consulta la descripción de los campos de metadatos en la siguiente lista.
Cada nueva fila de BigQuery incluye todas las columnas que monitoriza el flujo de cambios de su fila correspondiente en la tabla de Spanner en la marca de tiempo del registro de cambios.
Los siguientes campos de metadatos se añaden a las tablas de BigQuery. Para obtener más información sobre estos campos, consulta Registros de cambios de datos en "Particiones, registros y consultas de flujos de cambios".
_metadata_spanner_mod_type
: el tipo de modificación (insertar, actualizar o eliminar) de la transacción de Spanner. Extraído del registro de cambios de datos de la secuencia de cambios._metadata_spanner_table_name
: nombre de la tabla de Spanner. Este campo no es el nombre de la tabla de metadatos del conector._metadata_spanner_commit_timestamp
: la marca de tiempo de confirmación de Spanner, que es el momento en el que se confirma un cambio. Este valor se extrae del registro de cambios de datos de la secuencia de cambios._metadata_spanner_server_transaction_id
: cadena única a nivel global que representa la transacción de Spanner en la que se ha confirmado el cambio. Solo debe usar este valor en el contexto del procesamiento de registros de flujo de cambios. No se corresponde con el ID de transacción de la API de Spanner. Este valor se extrae del registro de cambios de datos de la secuencia de cambios._metadata_spanner_record_sequence
: número de secuencia del registro en la transacción de Spanner. Se garantiza que los números de secuencia son únicos y aumentan de forma monótona, pero no necesariamente de forma contigua, en una transacción. Este valor se extrae del registro de cambios de datos de la secuencia de cambios._metadata_spanner_is_last_record_in_transaction_in_partition
: indica si el registro es el último de una transacción de Spanner en la partición actual. Este valor se extrae del registro de cambios de datos de la secuencia de cambios._metadata_spanner_number_of_records_in_transaction
: número de registros de cambios de datos que forman parte de la transacción de Spanner en todas las particiones del flujo de cambios. Este valor se extrae del registro de cambios de datos de la secuencia de cambios._metadata_spanner_number_of_partitions_in_transaction
: número de particiones que devuelven registros de cambios de datos de la transacción de Spanner. Este valor se extrae del registro de cambios de datos de la secuencia de cambios._metadata_big_query_commit_timestamp
: la marca de tiempo de confirmación cuando se inserta la fila en BigQuery. SiuseStorageWriteApi
estrue
, la columna no se crea automáticamente en la tabla de registro de cambios mediante la canalización. En ese caso, debe añadir manualmente esta columna a la tabla del registro de cambios y definirCURRENT_TIMESTAMP
como valor predeterminado si es necesario.
Cuando uses esta plantilla, ten en cuenta los siguientes detalles:
- Puedes usar esta plantilla para propagar nuevas columnas en tablas o tablas nuevas de Spanner a BigQuery. Para obtener más información, consulta Gestionar la adición de tablas o columnas de seguimiento.
- En el caso de los tipos de captura de valores
OLD_AND_NEW_VALUES
yNEW_VALUES
, cuando el registro de cambios de datos contiene un cambio UPDATE, la plantilla debe hacer una lectura obsoleta en Spanner en la marca de tiempo de confirmación del registro de cambios de datos para recuperar las columnas sin cambios, pero monitorizadas. Asegúrate de configurar correctamente el parámetro "version_retention_period" de tu base de datos para la lectura obsoleta. En el caso del tipo de captura de valorNEW_ROW
, la plantilla es más eficiente, ya que el registro de cambios de datos captura toda la fila nueva, incluidas las columnas que no se actualizan en las solicitudes UPDATE, y la plantilla no necesita hacer una lectura obsoleta. - Para minimizar la latencia de la red y los costes de transporte de la red, ejecuta la tarea de Dataflow desde la misma región que tu instancia de Spanner o tus tablas de BigQuery. Si usas fuentes, receptores, ubicaciones de archivos de almacenamiento provisional o ubicaciones de archivos temporales que se encuentran fuera de la región de tu trabajo, es posible que tus datos se envíen entre regiones. Para obtener más información, consulta Regiones de Dataflow.
- Esta plantilla admite todos los tipos de datos válidos de Spanner. Si el tipo de BigQuery
es más preciso que el tipo de Spanner, puede que se pierda precisión durante la
transformación. En concreto:
- En el caso del tipo JSON de Spanner, los miembros de un objeto se ordenan lexicográficamente, pero no hay ninguna garantía de este tipo en el caso del tipo JSON de BigQuery.
- Spanner admite el tipo TIMESTAMP de nanosegundos, pero BigQuery solo admite el tipo TIMESTAMP de microsegundos.
Consulte más información sobre los flujos de cambios, cómo crear flujos de procesamiento de datos de flujos de cambios y las prácticas recomendadas.
Requisitos del flujo de procesamiento
- La instancia de Spanner debe existir antes de ejecutar el flujo de procesamiento.
- La base de datos de Spanner debe existir antes de ejecutar el flujo de procesamiento.
- La instancia de metadatos de Spanner debe existir antes de ejecutar el flujo de procesamiento.
- La base de datos de metadatos de Spanner debe existir antes de ejecutar el flujo de procesamiento.
- El flujo de cambios de Spanner debe existir antes de ejecutar el flujo de procesamiento.
- El conjunto de datos de BigQuery debe existir antes de ejecutar el flujo de procesamiento.
Gestionar la adición de tablas o columnas de seguimiento
En esta sección se describen las prácticas recomendadas para añadir tablas y columnas de seguimiento de Spanner mientras se ejecuta la canalización. La versión de plantilla más antigua compatible con esta función es 2024-09-19-00_RC00
.
- Antes de añadir una columna a un ámbito de flujo de cambios de Spanner, primero debes añadirla a la tabla de registro de cambios de BigQuery. La columna añadida debe tener un tipo de datos coincidente y ser
NULLABLE
. Espere al menos 10 minutos antes de continuar creando la nueva columna o tabla en Spanner. Si escribes en la nueva columna sin esperar, es posible que se genere un registro sin procesar con un código de error no válido en el directorio de la cola de mensajes fallidos. - Para añadir una tabla, primero debe añadirla a la base de datos de Spanner. La tabla se crea automáticamente en BigQuery cuando la canalización recibe un registro de la nueva tabla.
- Después de añadir las nuevas columnas o tablas a la base de datos de Spanner, asegúrate de modificar tu flujo de cambios para monitorizar las nuevas columnas o tablas que quieras si aún no se monitorizan de forma implícita.
- La plantilla no elimina tablas ni columnas de BigQuery. Si se elimina una columna de la tabla de Spanner, se rellenarán con valores nulos las columnas del registro de cambios de BigQuery de los registros generados después de que se eliminen las columnas de la tabla de Spanner, a menos que elimines manualmente la columna de BigQuery.
- La plantilla no admite actualizaciones del tipo de columna. Aunque Spanner permite cambiar una columna
STRING
a una columnaBYTES
o una columnaBYTES
a una columnaSTRING
, no puedes modificar el tipo de datos de una columna ya creada ni usar el mismo nombre de columna con diferentes tipos de datos en BigQuery. Si eliminas y vuelves a crear una columna con el mismo nombre, pero con un tipo diferente en Spanner, los datos se pueden escribir en la columna de BigQuery, pero el tipo no cambia. - Esta plantilla no admite actualizaciones en modo de columna. Las columnas de metadatos
replicadas en BigQuery se definen en el modo
REQUIRED
. El resto de las columnas replicadas en BigQuery se definen comoNULLABLE
, independientemente de si se definen comoNOT NULL
en la tabla de Spanner. No puedes actualizar las columnasNULLABLE
aREQUIRED
en BigQuery. - No se puede cambiar el tipo de captura de valor de un flujo de cambios en las canalizaciones en ejecución.
Parámetros de plantilla
Parámetros obligatorios
- spannerInstanceId la instancia de Spanner de la que se van a leer los flujos de cambios.
- spannerDatabase la base de datos de Spanner de la que se van a leer los flujos de cambios.
- spannerMetadataInstanceId: la instancia de Spanner que se va a usar para la tabla de metadatos del conector de flujos de cambios.
- spannerMetadataDatabase la base de datos de Spanner que se va a usar para la tabla de metadatos del conector de flujos de cambios.
- spannerChangeStreamName el nombre del flujo de cambios de Spanner del que se van a leer los datos.
- bigQueryDataset el conjunto de datos de BigQuery para la salida de los flujos de cambios.
Parámetros opcionales
- spannerProjectId el proyecto del que se van a leer los flujos de cambios. Este valor también es el proyecto en el que se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado de este parámetro es el proyecto en el que se ejecuta el flujo de procesamiento de Dataflow.
- spannerDatabaseRole el rol de base de datos de Spanner que se usará al ejecutar la plantilla. Este parámetro solo es obligatorio cuando la entidad de IAM que ejecuta la plantilla es un usuario con control de acceso detallado. El rol de la base de datos debe tener el privilegio
SELECT
en el flujo de cambios y el privilegioEXECUTE
en la función de lectura del flujo de cambios. Para obtener más información, consulta Control de acceso granular para secuencias de cambios (https://cloud.google.com/spanner/docs/fgac-change-streams). - spannerMetadataTableName nombre de la tabla de metadatos del conector de cambios en tiempo real de Spanner que se va a usar. Si no se proporciona, se crea automáticamente una tabla de metadatos del conector de flujos de cambios de Spanner durante el flujo de la canalización. Debe proporcionar este parámetro al actualizar una canalización. De lo contrario, no proporciones este parámetro.
- rpcPriority la prioridad de la solicitud para las llamadas de Spanner. El valor debe ser uno de los siguientes:
HIGH
,MEDIUM
oLOW
. El valor predeterminado esHIGH
. - spannerHost el endpoint de Cloud Spanner al que se llama en la plantilla. Solo se usa para hacer pruebas. Por ejemplo,
https://batch-spanner.googleapis.com
. - startTimestamp la fecha y hora de inicio (https://datatracker.ietf.org/doc/html/rfc3339) que se va a usar para leer los flujos de cambios (incluida). Ex-2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo en la que se inicia la canalización, es decir, la hora actual.
- endTimestamp la fecha y hora de finalización (https://datatracker.ietf.org/doc/html/rfc3339), incluida, que se usará para leer los flujos de cambios.Por ejemplo, 2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro.
- bigQueryProjectId el proyecto de BigQuery. El valor predeterminado es el proyecto del trabajo de Dataflow.
- bigQueryChangelogTableNameTemplate plantilla del nombre de la tabla de BigQuery que contiene el registro de cambios. El valor predeterminado es: {_metadata_spanner_table_name}_changelog.
- deadLetterQueueDirectory la ruta para almacenar los registros sin procesar. La ruta predeterminada es un directorio de la ubicación temporal de la tarea de Dataflow. El valor predeterminado suele ser suficiente.
- dlqRetryMinutes el número de minutos que transcurren entre los reintentos de la cola de mensajes fallidos. El valor predeterminado es
10
. - ignoreFields lista de campos separados por comas (se distingue entre mayúsculas y minúsculas) que se deben ignorar. Estos campos pueden ser campos de tablas monitorizadas o campos de metadatos añadidos por la canalización. Los campos ignorados no se insertan en BigQuery. Si ignora el campo _metadata_spanner_table_name, también se ignorará el parámetro bigQueryChangelogTableNameTemplate. El valor predeterminado es una cadena vacía.
- disableDlqRetries indica si se deben inhabilitar los reintentos de la cola de mensajes fallidos. Valor predeterminado: false.
- useStorageWriteApi si es true, la canalización usa la API Storage Write de BigQuery (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es
false
. Para obtener más información, consulta el artículo sobre cómo usar la API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce al usar la API Storage Write, especifica la semántica de escritura. Para usar la semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), asigna el valor
true
a este parámetro. Para usar la semántica de entrega única, asigna el valorfalse
al parámetro. Este parámetro solo se aplica cuandouseStorageWriteApi
estrue
. El valor predeterminado esfalse
. - numStorageWriteApiStreams cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debe definir este parámetro. El valor predeterminado es 0. - storageWriteApiTriggeringFrequencySec cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si
useStorageWriteApi
estrue
yuseStorageWriteApiAtLeastOnce
esfalse
, debe definir este parámetro.
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Cloud Spanner change streams to BigQuery template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Haz los cambios siguientes:
JOB_NAME
: un nombre de trabajo único que elijasVERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
SPANNER_INSTANCE_ID
: ID de instancia de SpannerSPANNER_DATABASE
: base de datos de SpannerSPANNER_METADATA_INSTANCE_ID
: ID de instancia de metadatos de SpannerSPANNER_METADATA_DATABASE
: base de datos de metadatos de SpannerSPANNER_CHANGE_STREAM
: flujo de cambios de SpannerBIGQUERY_DATASET
: el conjunto de datos de BigQuery para la salida de los flujos de cambios
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasVERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
SPANNER_INSTANCE_ID
: ID de instancia de SpannerSPANNER_DATABASE
: base de datos de SpannerSPANNER_METADATA_INSTANCE_ID
: ID de instancia de metadatos de SpannerSPANNER_METADATA_DATABASE
: base de datos de metadatos de SpannerSPANNER_CHANGE_STREAM
: flujo de cambios de SpannerBIGQUERY_DATASET
: el conjunto de datos de BigQuery para la salida de los flujos de cambios
Siguientes pasos
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.