Plantillas de flujos de cambios de Spanner a BigQuery

La plantilla de flujos de cambios de Spanner a BigQuery es una canalización de transmisión que transmite los registros de cambios de datos de Spanner y los escribe en tablas de BigQuery a través de Dataflow Runner v2.

Todas las columnas observadas por el flujo de cambios se incluyen en cada fila de la tabla de BigQuery, sin importar si una transacción de Spanner las modifica. Las columnas que no se observan no se incluyen en la fila de BigQuery. Cualquier cambio de Spanner que sea menor que la marca de agua de Dataflow se aplica de forma correcta a las tablas de BigQuery o se almacena en la cola de mensajes no entregados para reintentar aplicarlo. Las filas de BigQuery se insertan de forma desordenada en comparación con el orden original de las marcas de tiempo de confirmación de Spanner.

Si las tablas de BigQuery necesarias no existen, la canalización las crea. De lo contrario, se usan las tablas de BigQuery existentes. El esquema de las tablas de BigQuery existentes debe contener las columnas con seguimiento correspondientes de las tablas de Spanner y cualquier columna de metadatos adicionales que la opción ignoreFields no ignore de forma explícita. Consulta la descripción de los campos de metadatos en la siguiente lista. Cada fila nueva de BigQuery incluye todas las columnas observadas por el flujo de cambios de su fila correspondiente en tu tabla de Spanner en la marca de tiempo del registro de cambios.

Los siguientes campos de metadatos se agregan a las tablas de BigQuery. Para obtener más detalles sobre estos campos, consulta Registros de cambios de datos en la sección “Cambia particiones, registros y consultas”.

Cuando uses esta plantilla, ten en cuenta los siguientes detalles:

  • Esta plantilla no propaga los cambios de esquema de Spanner a BigQuery. Debido a que realizar un cambio de esquema en Spanner probablemente dañe la canalización, es posible que debas volver a crear la canalización después del cambio de esquema.
  • Para los tipos de captura de valor OLD_AND_NEW_VALUES y NEW_VALUES, cuando el registro de cambios de datos contiene un cambio UPDATE, la plantilla debe realizar una lectura inactiva en Spanner en la marca de tiempo de confirmación del registro de cambios de datos para recuperar las columnas que sí se observaron, pero no se modificaron. Asegúrate de configurar la base de datos “'version_retention_period” de forma adecuada para la lectura inactiva. Para el tipo de captura del valor NEW_ROW, la plantilla es más eficiente, porque el registro de cambios de datos captura la fila nueva completa, incluidas las columnas que no se actualizan en las solicitudes UPDATE, y la plantilla no necesita realizar una lectura inactiva.
  • Para minimizar la latencia de la red y los costos de transporte de la red, ejecuta el trabajo de Dataflow desde la misma región que tu instancia de Spanner o tus tablas de BigQuery. Si usas fuentes, receptores, ubicaciones de archivos de etapa de pruebas o ubicaciones de archivos temporales que se encuentran fuera de la región del trabajo, es posible que los datos se envíen a través de diferentes regiones. Para obtener más información, consulta Regiones de Dataflow.
  • Esta plantilla admite todos los tipos de datos válidos de Spanner. Si el tipo de BigQuery es más preciso que el tipo de Spanner, podría producirse una pérdida precisión durante la transformación. Específicamente:
    • Para el tipo JSON de Spanner, el orden de los miembros de un objeto se ordena de forma lexicográfica, pero no existe esa garantía para el tipo JSON de BigQuery.
    • Spanner admite el tipo de TIMESTAMP de nanosegundos, pero BigQuery solo admite el tipo de TIMESTAMP de microsegundos.
  • Esta plantilla no admite el uso de la API de BigQuery Storage Write en modo del tipo “exactamente una vez”.

Obtén más información sobre los flujos de cambios, cómo compilar canalizaciones de Dataflow de flujos de cambio y prácticas recomendadas.

Requisitos de la canalización

  • La instancia de Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de Spanner debe existir antes de ejecutar la canalización.
  • La instancia de metadatos de Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de metadatos de Spanner debe existir antes de ejecutar la canalización.
  • El flujo de cambios de Spanner debe existir antes de ejecutar la canalización.
  • El conjunto de datos de BigQuery debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetros obligatorios

  • spannerInstanceId: La instancia de Spanner desde la que se leerán los flujos de cambios.
  • spannerDatabase: La base de datos de Spanner desde la que se leerán los flujos de cambios.
  • spannerMetadataInstanceId: La instancia de Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
  • spannerMetadataDatabase: La base de datos de Spanner que se usará para la tabla de metadatos del conector de flujos de cambios. Para los flujos de cambios que realizan un seguimiento de todas las tablas en una base de datos, te recomendamos que coloques la tabla de metadatos en una base de datos separada.
  • spannerChangeStreamName: El nombre del flujo de cambios de Spanner desde el que se leerá.
  • bigQueryDataset: el conjunto de datos de BigQuery de salida de los flujos de cambios. Se aceptan tanto el dataSetName como el dataSetId completo (es decir, bigQueryProjectId.dataSetName).

Parámetros opcionales

  • spannerProjectId: El proyecto desde el que se leerán los flujos de cambio. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
  • spannerDatabaseRole: El rol de la base de datos que el usuario asume mientras lee desde el flujo de cambios. El rol de base de datos debe tener los privilegios necesarios para leer desde el flujo de cambios. Si no se especifica un rol de la base de datos, el usuario debe tener los permisos de IAM necesarios para leer desde la base de datos.
  • spannerMetadataTableName: el nombre de la tabla de metadatos del conector de flujos de cambios de Cloud Spanner que se usará. Si no se proporciona, se creará una tabla de metadatos de conectores de flujos de cambios de Cloud Spanner automáticamente durante el flujo de canalización. Este parámetro se debe proporcionar cuando se actualiza una canalización existente y no se debe proporcionar de otra manera.
  • rpcPriority: La prioridad de solicitud para llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH, MEDIUM, LOW]. La configuración predeterminada es: HIGH.
  • spannerHost: El extremo de Cloud Spanner al que se llamará en la plantilla. Solo se usa para pruebas. (Ejemplo: https://batch-spanner.googleapis.com).
  • startTimestamp: La fecha y hora de inicio, inclusive, que se usará para leer los flujos de cambios (https://tools.ietf.org/html/rfc3339). Por ejemplo, 2022-05-05T07:59:59Z. El valor predeterminado es la marca de tiempo del inicio de la canalización.
  • endTimestamp: La fecha y hora de finalización, inclusive, que se usará para leer los flujos de cambios (https://tools.ietf.org/html/rfc3339). Ej.: 2022-05-05T07:59:59Z. El valor predeterminado es un tiempo infinito en el futuro.
  • bigQueryProjectId: el proyecto de BigQuery El valor predeterminado es el proyecto para el trabajo de Dataflow.
  • bigQueryChangelogTableNameTemplate : la plantilla para el nombre de la tabla de BigQuery que contiene el registro de cambios. El valor predeterminado es: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory: la ruta de acceso del archivo que almacenará los registros no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
  • dlqRetryMinutes: la cantidad de minutos entre reintentos de la cola de mensajes no entregados. La configuración predeterminada es 10.
  • ignoreFields: lista separada por comas de los campos que se ignorarán, que podrían ser campos de tablas con seguimiento o campos de metadatos que son _metadata_spanner_mod_type, _metadata_spanner_table_name, _metadata_spanner_commit_timestamp, _metadata_spanner_server_transaction_id, _metadata_spanner_record_sequence, _metadata_spanner_is_last_record_in_transaction_in_partition, _metadata_spanner_number_of_records_in_transaction, _metadata_spanner_number_of_partitions_in_transaction, _metadata_big_query_commit_timestamp. La configuración predeterminada es vacía.
  • disableDlqRetries: indica si se deben inhabilitar o no los reintentos para la DLQ. La configuración predeterminada es "false".
  • useStorageWriteApi: si es verdadero, la canalización usa la API de Storage Write cuando escribe los datos en BigQuery (consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). El valor predeterminado es falso. Cuando usas la API de Storage Write en modo “exactamente una vez”, debes establecer los siguientes parámetros: “Cantidad de transmisiones para la API de BigQuery Storage Write” y “Frecuencia de activación en segundos para la API de BigQuery Storage Write”. Si habilitas el modo de al menos una vez de Dataflow o configuras el parámetro useStorageWriteApiAtLeastOnce como verdadero, no es necesario que establezcas la cantidad de transmisiones ni la frecuencia de activación.
  • useStorageWriteApiAtLeastOnce: Este parámetro solo se aplica si “Usar la API de BigQuery Storage Write” está habilitada. Si se habilita, se usará la semántica de “al menos una vez” para la API de Storage Write; de lo contrario, se usará la semántica de “exactamente una vez”. La configuración predeterminada es "false".
  • numStorageWriteApiStreams: la cantidad de transmisiones define el paralelismo de la transformación de escritura de BigQueryIO y corresponde aproximadamente a la cantidad de transmisiones de la API de Storage Write que usará la canalización. Consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para conocer los valores recomendados. La configuración predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec : la frecuencia de activación determinará qué tan pronto serán visibles los datos para las consultas en BigQuery. Consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para conocer los valores recomendados.

Ejecuta la plantilla

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Spanner
  • SPANNER_DATABASE: base de datos de Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Spanner
  • SPANNER_METADATA_DATABASE: base de datos de metadatos de Spanner
  • SPANNER_CHANGE_STREAM: flujo de cambios de Spanner
  • BIGQUERY_DATASET: El conjunto de datos de BigQuery de salida de los flujos de cambios.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Spanner
  • SPANNER_DATABASE: base de datos de Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Spanner
  • SPANNER_METADATA_DATABASE: base de datos de metadatos de Spanner
  • SPANNER_CHANGE_STREAM: flujo de cambios de Spanner
  • BIGQUERY_DATASET: El conjunto de datos de BigQuery de salida de los flujos de cambios.

¿Qué sigue?