Plantillas de flujos de cambios de Spanner a BigQuery

La plantilla de flujos de cambios de Spanner a BigQuery es una canalización de transmisión que transmite los registros de cambios de datos de Spanner y los escribe en tablas de BigQuery a través de Dataflow Runner v2.

Todas las columnas observadas por el flujo de cambios se incluyen en cada fila de la tabla de BigQuery, sin importar si una transacción de Spanner las modifica. Las columnas que no se observan no se incluyen en la fila de BigQuery. Cualquier cambio de Spanner que sea menor que la marca de agua de Dataflow se aplica de forma correcta a las tablas de BigQuery o se almacena en la cola de mensajes no entregados para reintentar aplicarlo. Las filas de BigQuery se insertan de forma desordenada en comparación con el orden original de las marcas de tiempo de confirmación de Spanner.

Si las tablas de BigQuery necesarias no existen, la canalización las crea. De lo contrario, se usan las tablas de BigQuery existentes. El esquema de las tablas de BigQuery existentes debe contener las columnas con seguimiento correspondientes de las tablas de Spanner y cualquier columna de metadatos adicionales que la opción ignoreFields no ignore de forma explícita. Consulta la descripción de los campos de metadatos en la siguiente lista. Cada fila nueva de BigQuery incluye todas las columnas observadas por el flujo de cambios de su fila correspondiente en tu tabla de Spanner en la marca de tiempo del registro de cambios.

Los siguientes campos de metadatos se agregan a las tablas de BigQuery. Para obtener más detalles sobre estos campos, consulta Registros de cambios de datos en la sección “Cambia particiones, registros y consultas”.

  • _metadata_spanner_mod_type: el tipo de modificación (inserción, actualización o eliminación) de la transacción de Spanner. Se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_table_name: el nombre de la tabla de Spanner. Este campo no es el nombre de la tabla de metadatos del conector.
  • _metadata_spanner_commit_timestamp: la marca de tiempo de confirmación de Spanner, que es el momento en que se confirma un cambio. Este valor se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_server_transaction_id: Una cadena única a nivel global que representa la transacción de Spanner en la que se confirmó el cambio. Usa este valor solo en el contexto del procesamiento de registros de transmisión de cambios. No está correlacionado con el ID de transacción en la API de Spanner. Este valor se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_record_sequence: el número de secuencia para el registro dentro de la transacción de Spanner. Se garantiza que los números de secuencia son únicos y aumentan monótonamente, pero no necesariamente contiguos, dentro de una transacción. Este valor se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: indica si el registro es el último de una transacción de Spanner en la partición actual. Este valor se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_number_of_records_in_transaction: la cantidad de registros de cambios de datos que forman parte de la transacción de Spanner en todas las particiones de transmisión de cambios. Este valor se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_spanner_number_of_partitions_in_transaction: La cantidad de particiones que muestran registros de cambios de datos para la transacción de Spanner. Este valor se extrae del registro de cambios de los datos de flujos de cambios.
  • _metadata_big_query_commit_timestamp: la marca de tiempo de confirmación del momento en que se inserta la fila en BigQuery. Si useStorageWriteApi es true, la canalización no crea esta columna de forma automática en la tabla de registros de cambios. En ese caso, debes agregar esta columna de forma manual en la tabla de registro de cambios si es necesario.

Cuando uses esta plantilla, ten en cuenta los siguientes detalles:

  • Puedes usar esta plantilla para propagar columnas nuevas en tablas existentes o tablas nuevas de Spanner a BigQuery. Para obtener más información, consulta Controla la adición de tablas o columnas de seguimiento.
  • Para los tipos de captura de valor OLD_AND_NEW_VALUES y NEW_VALUES, cuando el registro de cambios de datos contiene un cambio UPDATE, la plantilla debe realizar una lectura inactiva en Spanner en la marca de tiempo de confirmación del registro de cambios de datos para recuperar las columnas que sí se observaron, pero no se modificaron. Asegúrate de configurar la base de datos “'version_retention_period” de forma adecuada para la lectura inactiva. Para el tipo de captura del valor NEW_ROW, la plantilla es más eficiente, porque el registro de cambios de datos captura la fila nueva completa, incluidas las columnas que no se actualizan en las solicitudes UPDATE, y la plantilla no necesita realizar una lectura inactiva.
  • Para minimizar la latencia de la red y los costos de transporte de la red, ejecuta el trabajo de Dataflow desde la misma región que tu instancia de Spanner o tus tablas de BigQuery. Si usas fuentes, receptores, ubicaciones de archivos de etapa de pruebas o ubicaciones de archivos temporales que se encuentran fuera de la región del trabajo, es posible que los datos se envíen a través de diferentes regiones. Para obtener más información, consulta Regiones de Dataflow.
  • Esta plantilla admite todos los tipos de datos válidos de Spanner. Si el tipo de BigQuery es más preciso que el tipo de Spanner, podría producirse una pérdida precisión durante la transformación. Específicamente:
    • Para el tipo JSON de Spanner, el orden de los miembros de un objeto se ordena de forma lexicográfica, pero no existe esa garantía para el tipo JSON de BigQuery.
    • Spanner admite el tipo de TIMESTAMP de nanosegundos, pero BigQuery solo admite el tipo de TIMESTAMP de microsegundos.

Obtén más información sobre los flujos de cambios, cómo compilar canalizaciones de Dataflow de flujos de cambio y prácticas recomendadas.

Requisitos de la canalización

  • La instancia de Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de Spanner debe existir antes de ejecutar la canalización.
  • La instancia de metadatos de Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de metadatos de Spanner debe existir antes de ejecutar la canalización.
  • El flujo de cambios de Spanner debe existir antes de ejecutar la canalización.
  • El conjunto de datos de BigQuery debe existir antes de ejecutar la canalización.

Controla la adición de tablas o columnas de seguimiento

En esta sección, se describen las prácticas recomendadas para agregar tablas y columnas de seguimiento de Spanner mientras se ejecuta la canalización. La versión más antigua de la plantilla compatible para esta función es 2024-09-19-00_RC00.

  • Antes de agregar una columna nueva a un alcance de flujo de cambios de Spanner, primero agrégala a la tabla de registro de cambios de BigQuery. La columna agregada debe tener un tipo de datos coincidente y ser NULLABLE. Espera al menos 10 minutos antes de continuar con la creación de la columna o tabla nueva en Spanner. Si escribes en la columna nueva sin esperar, es posible que se genere un registro sin procesar con un código de error no válido en el directorio de la cola de mensajes no entregados.
  • Para agregar una tabla nueva, primero agrégala a la base de datos de Spanner. La tabla se crea automáticamente en BigQuery cuando la canalización recibe un registro para la tabla nueva.
  • Después de agregar las columnas o tablas nuevas a la base de datos de Spanner, asegúrate de alterar tu flujo de cambios para hacer un seguimiento de las columnas o tablas nuevas que deseas si aún no se les hace un seguimiento implícito.
  • La plantilla no descarta tablas ni columnas de BigQuery. Si se quita una columna de la tabla de Spanner, los valores nulos se propagan a las columnas del registro de cambios de BigQuery para los registros generados después de que se quiten las columnas de la tabla de Spanner, a menos que quites la columna de BigQuery de forma manual.
  • La plantilla no admite actualizaciones de tipo de columna. Aunque Spanner admite cambiar una columna STRING a una BYTES o una columna BYTES a una STRING, no puedes modificar el tipo de datos de una columna existente ni usar el mismo nombre de columna con diferentes tipos de datos en BigQuery. Si sueltas y vuelves a crear una columna con el mismo nombre, pero con un tipo diferente en Spanner, es posible que los datos se escriban en la columna de BigQuery existente, pero el tipo no cambiará.
  • Esta plantilla no admite actualizaciones del modo de columna. Las columnas de metadatos replicadas en BigQuery se configuran en el modo REQUIRED. Todas las demás columnas replicadas en BigQuery se establecen en NULLABLE, independientemente de si se definen como NOT NULL en la tabla de Spanner. No puedes actualizar las columnas NULLABLE al modo REQUIRED en BigQuery.
  • No se admite cambiar el tipo de captura de valor de un flujo de cambios para ejecutar canalizaciones.

Parámetros de la plantilla

Parámetros obligatorios

  • spannerInstanceId: La instancia de Spanner desde la que se leerán los flujos de cambios.
  • spannerDatabase: La base de datos de Spanner desde la que se leerán los flujos de cambios.
  • spannerMetadataInstanceId: La instancia de Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
  • spannerMetadataDatabase: La base de datos de Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
  • spannerChangeStreamName: El nombre del flujo de cambios de Spanner desde el que se leerá.
  • bigQueryDataset: Es el conjunto de datos de BigQuery de salida de los flujos de cambios.

Parámetros opcionales

  • spannerProjectId: El proyecto desde el que se leerán los flujos de cambios. Este valor es también el proyecto en el que se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
  • spannerDatabaseRole: El rol de base de datos de Spanner que se usará cuando se ejecute la plantilla. Este parámetro solo es necesario cuando el principal de IAM que ejecuta la plantilla es un usuario de control de acceso detallado. El rol de la base de datos debe tener el privilegio SELECT en la transmisión de cambios y el privilegio EXECUTE en la función de lectura de la transmisión de cambios. Para obtener más información, consulta Control de acceso detallado para los flujos de cambios (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: El nombre de la tabla de metadatos del conector de flujos de cambios de Spanner que se usará. Si no se proporciona, una tabla de metadatos de conectores de flujos de cambios de Spanner se crea de forma automática durante el flujo de la canalización. Debes proporcionar este parámetro cuando actualices una canalización existente. De lo contrario, no proporciones este parámetro.
  • rpcPriority: La prioridad de solicitud para llamadas de Spanner. El valor debe ser uno de estos valores: HIGH, MEDIUM o LOW. El valor predeterminado es HIGH.
  • spannerHost: Es el extremo de Cloud Spanner al que se llamará en la plantilla. Solo se usa para pruebas. Por ejemplo, https://batch-spanner.googleapis.com
  • startTimestamp: La fecha y hora de inicio (https://datatracker.ietf.org/doc/html/rfc3339), inclusiva, que se usará para leer los flujos de cambios. Ej.: 2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo del inicio de la canalización, es decir, la hora actual.
  • endTimestamp: La fecha y hora de finalización (https://datatracker.ietf.org/doc/html/rfc3339), inclusiva, que se usará para leer los flujos de cambios.Ex-2021-10-12T07:20:50.52Z. El valor predeterminado es un tiempo infinito en el futuro.
  • bigQueryProjectId: Es el proyecto de BigQuery. El valor predeterminado es el proyecto para el trabajo de Dataflow.
  • bigQueryChangelogTableNameTemplate: Es la plantilla para el nombre de la tabla de BigQuery que contiene el registro de cambios. El valor predeterminado es: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory: Es la ruta para almacenar los registros no procesados. La ruta predeterminada es un directorio en la ubicación temporal del trabajo de Dataflow. Por lo general, el valor predeterminado es suficiente.
  • dlqRetryMinutes: La cantidad de minutos entre reintentos de la cola de mensajes no entregados. El valor predeterminado es 10.
  • ignoreFields: Es una lista de campos separados por comas (distingue mayúsculas de minúsculas) que se deben ignorar. Estos campos pueden ser de tablas observadas o campos de metadatos que agregan la canalización. Los campos ignorados no se insertan en BigQuery. Cuando ignoras el campo _metadata_spanner_table_name, también se ignora el parámetro bigQueryChangelogTableNameTemplate. La configuración predeterminada es vacía.
  • disableDlqRetries: Indica si se deben inhabilitar o no los reintentos para la DLQ. La configuración predeterminada es "false".
  • useStorageWriteApi: Si es verdadero, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: Cuando usas la API de Storage Write, se especifica la semántica de escritura. Para usar una semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), configura este parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en false. Este parámetro se aplica solo cuando useStorageWriteApi es true. El valor predeterminado es false.
  • numStorageWriteApiStreams: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro. La configuración predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec: Cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Spanner
  • SPANNER_DATABASE: base de datos de Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Spanner
  • SPANNER_METADATA_DATABASE: base de datos de metadatos de Spanner
  • SPANNER_CHANGE_STREAM: flujo de cambios de Spanner
  • BIGQUERY_DATASET: El conjunto de datos de BigQuery de salida de los flujos de cambios.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Spanner
  • SPANNER_DATABASE: base de datos de Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Spanner
  • SPANNER_METADATA_DATABASE: base de datos de metadatos de Spanner
  • SPANNER_CHANGE_STREAM: flujo de cambios de Spanner
  • BIGQUERY_DATASET: El conjunto de datos de BigQuery de salida de los flujos de cambios.
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.Timestamp;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.ModColumnType;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.OptionsUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO(haikuo-google): Add integration test.
// TODO(haikuo-google): Add README.
// TODO(haikuo-google): Add stackdriver metrics.
// TODO(haikuo-google): Ideally side input should be used to store schema information and shared
// accross DoFns, but since side input fix is not yet deployed at the moment, we read schema
// information in the beginning of the DoFn as a work around. We should use side input instead when
// it's available.
// TODO(haikuo-google): Test the case where tables or columns are added while the pipeline is
// running.
/**
 * This pipeline ingests {@link DataChangeRecord} from Spanner change stream. The {@link
 * DataChangeRecord} is then broken into {@link Mod}, which converted into {@link TableRow} and
 * inserted into BigQuery table.
 */
@Template(
    name = "Spanner_Change_Streams_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to BigQuery",
    description = {
      "The Cloud Spanner change streams to BigQuery template is a streaming pipeline that streams"
          + " Cloud Spanner data change records and writes them into BigQuery tables using Dataflow"
          + " Runner V2.\n",
      "All change stream watched columns are included in each BigQuery table row, regardless of"
          + " whether they are modified by a Cloud Spanner transaction. Columns not watched are not"
          + " included in the BigQuery row. Any Cloud Spanner change less than the Dataflow"
          + " watermark are either successfully applied to the BigQuery tables or are stored in the"
          + " dead-letter queue for retry. BigQuery rows are inserted out of order compared to the"
          + " original Cloud Spanner commit timestamp ordering.\n",
      "If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise, existing"
          + " BigQuery tables are used. The schema of existing BigQuery tables must contain the"
          + " corresponding tracked columns of the Cloud Spanner tables and any additional metadata"
          + " columns that are not ignored explicitly by the ignoreFields option. See the"
          + " description of the metadata fields in the following list. Each new BigQuery row"
          + " includes all columns watched by the change stream from its corresponding row in your"
          + " Cloud Spanner table at the change record's timestamp.\n",
      "The following metadata fields are added to BigQuery tables. For more details about these"
          + " fields, see Data change records in \"Change streams partitions, records, and"
          + " queries.\"\n"
          + "- _metadata_spanner_mod_type: The modification type (insert, update, or delete) of the"
          + " Cloud Spanner transaction. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_table_name: The Cloud Spanner table name. Note this field is not"
          + " the metadata table name of the connector.\n"
          + "- _metadata_spanner_commit_timestamp: The Spanner commit timestamp, which is the time"
          + " when a change is committed. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_server_transaction_id: A globally unique string that represents"
          + " the Spanner transaction in which the change was committed. Only use this value in the"
          + " context of processing change stream records. It isn't correlated with the transaction"
          + " ID in Spanner's API. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_record_sequence: The sequence number for the record within the"
          + " Spanner transaction. Sequence numbers are guaranteed to be unique and monotonically"
          + " increasing (but not necessarily contiguous) within a transaction. Extracted from"
          + " change stream data change record.\n"
          + "- _metadata_spanner_is_last_record_in_transaction_in_partition: Indicates whether the"
          + " record is the last record for a Spanner transaction in the current partition."
          + " Extracted from change stream data change record.\n"
          + "- _metadata_spanner_number_of_records_in_transaction: The number of data change"
          + " records that are part of the Spanner transaction across all change stream partitions."
          + " Extracted from change stream data change record.\n"
          + "- _metadata_spanner_number_of_partitions_in_transaction: The number of partitions that"
          + " return data change records for the Spanner transaction. Extracted from change stream"
          + " data change record.\n"
          + "- _metadata_big_query_commit_timestamp: The commit timestamp of when the row is"
          + " inserted into BigQuery.\n",
      "Notes:\n"
          + "- This template does not propagate schema changes from Cloud Spanner to BigQuery."
          + " Because performing a schema change in Cloud Spanner is likely going to break the"
          + " pipeline, you might need to recreate the pipeline after the schema change.\n"
          + "- For OLD_AND_NEW_VALUES and NEW_VALUES value capture types, when the data change"
          + " record contains an UPDATE change, the template needs to do a stale read to Cloud"
          + " Spanner at the commit timestamp of the data change record to retrieve the unchanged"
          + " but watched columns. Make sure to configure your database 'version_retention_period'"
          + " properly for the stale read. For the NEW_ROW value capture type, the template is more"
          + " efficient, because the data change record captures the full new row including columns"
          + " that are not updated in UPDATEs, and the template does not need to do a stale read.\n"
          + "- You can minimize network latency and network transport costs by running the Dataflow"
          + " job from the same region as your Cloud Spanner instance or BigQuery tables. If you"
          + " use sources, sinks, staging file locations, or temporary file locations that are"
          + " located outside of your job's region, your data might be sent across regions. See"
          + " more about Dataflow regional endpoints.\n"
          + "- This template supports all valid Cloud Spanner data types, but if the BigQuery type"
          + " is more precise than the Cloud Spanner type, precision loss might occur during the"
          + " transformation. Specifically:\n"
          + "  - For Cloud Spanner JSON type, the order of the members of an object is"
          + " lexicographically ordered, but there is no such guarantee for BigQuery JSON type.\n"
          + "  - Cloud Spanner supports nanoseconds TIMESTAMP type, BigQuery only supports"
          + " microseconds TIMESTAMP type.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change"
          + " streams</a>, <a"
          + " href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to"
          + " build change streams Dataflow pipelines</a>, and <a"
          + " href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best"
          + " practices</a>."
    },
    optionsClass = SpannerChangeStreamsToBigQueryOptions.class,
    flexContainerName = "spanner-changestreams-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The BigQuery dataset must exist prior to running the pipeline."
    },
    streaming = true,
    supportsExactlyOnce = true,
    supportsAtLeastOnce = true)
public final class SpannerChangeStreamsToBigQuery {

  /** String/String Coder for {@link FailsafeElement}. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToBigQuery.class);

  // Max number of deadletter queue retries.
  private static final int DLQ_MAX_RETRIES = 5;

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting to replicate change records from Spanner change streams to BigQuery");

    SpannerChangeStreamsToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerChangeStreamsToBigQueryOptions.class);

    run(options);
  }

  private static void validateOptions(SpannerChangeStreamsToBigQueryOptions options) {
    if (options.getDlqRetryMinutes() <= 0) {
      throw new IllegalArgumentException("dlqRetryMinutes must be positive.");
    }
    if (options
        .getBigQueryChangelogTableNameTemplate()
        .equals(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME)) {
      throw new IllegalArgumentException(
          String.format(
              "bigQueryChangelogTableNameTemplate cannot be set to '{%s}'. This value is reserved"
                  + " for the Cloud Spanner table name.",
              BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME));
    }

    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
  }

  private static void setOptions(SpannerChangeStreamsToBigQueryOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) {
    setOptions(options);
    validateOptions(options);

    /**
     * Stages: 1) Read {@link DataChangeRecord} from change stream. 2) Create {@link
     * FailsafeElement} of {@link Mod} JSON and merge from: - {@link DataChangeRecord}. - GCS Dead
     * letter queue. 3) Convert {@link Mod} JSON into {@link TableRow} by reading from Spanner at
     * commit timestamp. 4) Append {@link TableRow} to BigQuery. 5) Write Failures from 2), 3) and
     * 4) to GCS dead letter queue.
     */
    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);
    String spannerProjectId = OptionsUtils.getSpannerProjectId(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    /**
     * There are two types of errors that can occur in this pipeline:
     *
     * <p>1) Error originating from modJsonStringToTableRow. Errors here are either due to pk values
     * missing, a spanner table / column missing in the in-memory map, or some Spanner read error
     * happening in readSpannerRow. We already retry the Spanner read error inline 3 times. Th other
     * types of errors are more likely to be un-retriable.
     *
     * <p>2) Error originating from BigQueryIO.write. BigQuery storage write API already retries all
     * transient errors and outputs more permanent errors.
     *
     * <p>As a result, it is reasonable to write all errors happening in the pipeline directly into
     * the permanent DLQ, since most of the errors are likely to be non-transient.
     */
    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    // Retrieve and parse the startTimestamp and endTimestamp.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(spannerProjectId)
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabase())
            .withRpcPriority(options.getRpcPriority());
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }

    SpannerIO.ReadChangeStream readChangeStream =
        SpannerIO.readChangeStream()
            .withSpannerConfig(spannerConfig)
            .withMetadataInstance(options.getSpannerMetadataInstanceId())
            .withMetadataDatabase(options.getSpannerMetadataDatabase())
            .withChangeStreamName(options.getSpannerChangeStreamName())
            .withInclusiveStartAt(startTimestamp)
            .withInclusiveEndAt(endTimestamp)
            .withRpcPriority(options.getRpcPriority());

    String spannerMetadataTableName = options.getSpannerMetadataTableName();
    if (spannerMetadataTableName != null) {
      readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName);
    }

    PCollection<DataChangeRecord> dataChangeRecord =
        pipeline
            .apply("Read from Spanner Change Streams", readChangeStream)
            .apply("Reshuffle DataChangeRecord", Reshuffle.viaRandomKey());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply("DataChangeRecord To Mod JSON", ParDo.of(new DataChangeRecordToModJsonFn()))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson =
        dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    ImmutableSet.Builder<String> ignoreFieldsBuilder = ImmutableSet.builder();
    for (String ignoreField : options.getIgnoreFields().split(",")) {
      ignoreFieldsBuilder.add(ignoreField);
    }
    ImmutableSet<String> ignoreFields = ignoreFieldsBuilder.build();
    FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions
        failsafeModJsonToTableRowOptions =
            FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions.builder()
                .setSpannerConfig(spannerConfig)
                .setSpannerChangeStream(options.getSpannerChangeStreamName())
                .setIgnoreFields(ignoreFields)
                .setCoder(FAILSAFE_ELEMENT_CODER)
                .setUseStorageWriteApi(options.getUseStorageWriteApi())
                .build();
    FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow failsafeModJsonToTableRow =
        new FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow(
            failsafeModJsonToTableRowOptions);

    PCollectionTuple tableRowTuple =
        failsafeModJson.apply("Mod JSON To TableRow", failsafeModJsonToTableRow);
    // If users pass in the full BigQuery dataset ID (projectId.datasetName), extract the dataset
    // name for the setBigQueryDataset parameter.
    List<String> results = OptionsUtils.processBigQueryProjectAndDataset(options);
    String bigqueryProject = results.get(0);
    String bigqueryDataset = results.get(1);

    BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions
        bigQueryDynamicDestinationsOptions =
            BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions.builder()
                .setSpannerConfig(spannerConfig)
                .setChangeStreamName(options.getSpannerChangeStreamName())
                .setIgnoreFields(ignoreFields)
                .setBigQueryProject(bigqueryProject)
                .setBigQueryDataset(bigqueryDataset)
                .setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate())
                .setUseStorageWriteApi(options.getUseStorageWriteApi())
                .build();
    WriteResult writeResult;
    if (!options.getUseStorageWriteApi()) {
      writeResult =
          tableRowTuple
              .get(failsafeModJsonToTableRow.transformOut)
              .apply(
                  "Write To BigQuery",
                  BigQueryIO.<TableRow>write()
                      .to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
                      .withFormatFunction(element -> removeIntermediateMetadataFields(element))
                      .withFormatRecordOnFailureFunction(element -> element)
                      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    } else {
      writeResult =
          tableRowTuple
              .get(failsafeModJsonToTableRow.transformOut)
              .apply(
                  "Write To BigQuery",
                  BigQueryIO.<TableRow>write()
                      .to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
                      .withFormatFunction(element -> removeIntermediateMetadataFields(element))
                      .withFormatRecordOnFailureFunction(element -> element)
                      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
                      .ignoreUnknownValues()
                      .withAutoSchemaUpdate(true) // only supported when using STORAGE_WRITE_API or
                      // STORAGE_API_AT_LEAST_ONCE.
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    }

    PCollection<String> transformDlqJson =
        tableRowTuple
            .get(failsafeModJsonToTableRow.transformDeadLetterOut)
            .apply(
                "Failed Mod JSON During Table Row Transformation",
                MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollection<String> bqWriteDlqJson =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "Failed Mod JSON During BigQuery Writes",
                MapElements.via(new BigQueryDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        // Generally BigQueryIO storage write retries transient errors, and only more
        // persistent errors make it into DLQ.
        .and(bqWriteDlqJson)
        .apply("Merge Failed Mod JSON From Transform And BigQuery", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);

    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retryable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static DeadLetterQueueManager buildDlqManager(
      SpannerChangeStreamsToBigQueryOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDeadLetterQueueDirectory().isEmpty()
            ? tempLocation + "dlq/"
            : options.getDeadLetterQueueDirectory();

    LOG.info("Dead letter queue directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, DLQ_MAX_RETRIES);
  }

  /**
   * Remove the following intermediate metadata fields that are not user data from {@link TableRow}:
   * _metadata_error, _metadata_retry_count, _metadata_spanner_original_payload_json.
   */
  private static TableRow removeIntermediateMetadataFields(TableRow tableRow) {
    TableRow cleanTableRow = tableRow.clone();
    Set<String> rowKeys = tableRow.keySet();
    Set<String> metadataFields = BigQueryUtils.getBigQueryIntermediateMetadataFieldNames();

    for (String rowKey : rowKeys) {
      if (metadataFields.contains(rowKey)) {
        cleanTableRow.remove(rowKey);
      } else if (rowKeys.contains("_type_" + rowKey)) {
        cleanTableRow.remove("_type_" + rowKey);
      }
    }

    return cleanTableRow;
  }

  /**
   * DoFn that converts a {@link DataChangeRecord} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class DataChangeRecordToModJsonFn extends DoFn<DataChangeRecord, String> {

    @ProcessElement
    public void process(@Element DataChangeRecord input, OutputReceiver<String> receiver) {
      for (org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod changeStreamsMod :
          input.getMods()) {
        Mod mod =
            new Mod(
                changeStreamsMod.getKeysJson(),
                changeStreamsMod.getNewValuesJson(),
                input.getCommitTimestamp(),
                input.getServerTransactionId(),
                input.isLastRecordInTransactionInPartition(),
                input.getRecordSequence(),
                input.getTableName(),
                input.getRowType().stream().map(ModColumnType::new).collect(Collectors.toList()),
                input.getModType(),
                input.getValueCaptureType(),
                input.getNumberOfRecordsInTransaction(),
                input.getNumberOfPartitionsInTransaction());

        String modJsonString;

        try {
          modJsonString = mod.toJson();
        } catch (IOException e) {
          // Ignore exception and print bad format.
          modJsonString = String.format("\"%s\"", input);
        }
        receiver.output(modJsonString);
      }
    }
  }
}

¿Qué sigue?