Plantilla de flujos de cambios de Spanner a base de datos de origen

Canalización de transmisión. Lee datos de los flujos de cambios de Spanner y los escribe en una fuente.

Parámetros de la plantilla

Parámetro Descripción
changeStreamName El nombre del flujo de cambios de Spanner del que lee la canalización.
instanceId Es el nombre de la instancia de Spanner en la que se encuentra el flujo de cambios.
databaseId Es el nombre de la base de datos de Spanner que supervisa el flujo de cambios.
spannerProjectId Es el nombre del proyecto de Spanner.
metadataInstance Es la instancia para almacenar los metadatos que usa el conector para controlar el consumo de los datos de la API del flujo de cambios.
metadataDatabase Es la base de datos para almacenar los metadatos que usa el conector para controlar el consumo de los datos de la API del flujo de cambios.
sourceShardsFilePath Es la ruta de acceso a un archivo de Cloud Storage que contiene información del perfil de conexión para los fragmentos de origen.
startTimestamp Opcional: La marca de tiempo de inicio para leer los cambios. La configuración predeterminada es vacía.
endTimestamp Opcional: La marca de tiempo de finalización para leer los cambios. Si no se proporciona una marca de tiempo, se lee de forma indefinida. La configuración predeterminada es vacía.
shadowTablePrefix Opcional: El prefijo que se usa para nombrar las tablas de paralelas. Valor predeterminado: shadow_.
sessionFilePath Opcional: Ruta de acceso de la sesión en Cloud Storage que contiene información de asignación de HarbourBridge.
filtrationMode Opcional: Es el modo de filtrado. Especifica cómo descartar ciertos registros según un criterio. Los modos admitidos son: none (no filtrar nada), forward_migration (filtrar los registros escritos con la canalización de migración directa). La configuración predeterminada es forward_migration.
shardingCustomJarPath Opcional: La ubicación del archivo JAR personalizado en Cloud Storage que contiene la lógica de personalización para recuperar el ID del fragmento. Si estableces este parámetro, establece el parámetro shardingCustomJarPath. La configuración predeterminada es vacía.
shardingCustomClassName Opcional: El nombre de clase completamente calificado que tiene la implementación del ID de fragmento personalizado. Si se especifica shardingCustomJarPath, este parámetro es obligatorio. La configuración predeterminada es vacía.
shardingCustomParameters Opcional: La cadena que contiene cualquier parámetro personalizado que se pasará a la clase de fragmentación personalizada. La configuración predeterminada es vacía.
sourceDbTimezoneOffset Opcional: Es la compensación de zona horaria desde UTC para la base de datos de origen. Valor de ejemplo: +10:00. La configuración predeterminada es +00:00.
dlqGcsPubSubSubscription Opcional: La suscripción a Pub/Sub que se usa en una política de notificaciones de Cloud Storage para el directorio de reintentos de DLQ cuando se ejecuta en modo normal. El nombre debe tener el formato projects/<project-id>/subscriptions/<subscription-name>. Cuando se establece, se ignoran deadLetterQueueDirectory y dlqRetryMinutes.
skipDirectoryName Opcional: Los registros omitidos de la replicación inversa se escriben en este directorio. El nombre del directorio predeterminado es skip.
maxShardConnections Opcional: Es la cantidad máxima de conexiones que puede aceptar un fragmento determinado. La configuración predeterminada es 10000.
deadLetterQueueDirectory Opcional: Es la ruta de acceso que se usa cuando se almacena el resultado de la cola de errores. La ruta predeterminada es un directorio en la ubicación temporal del trabajo de Dataflow.
dlqMaxRetryCount Opcional: Es la cantidad máxima de veces que se pueden reintentar los errores temporales a través de la cola de mensajes no entregados. La configuración predeterminada es 500.
runMode Opcional: Es el tipo de modo de ejecución. Valores admitidos: regular, retryDLQ. Valor predeterminado: regular. Especifica que retryDLQ solo vuelve a intentar los registros graves de la cola de mensajes no entregados.
dlqRetryMinutes Opcional: La cantidad de minutos entre reintentos de la cola de mensajes no entregados. La configuración predeterminada es 10.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Spanner Change Streams to Source Database template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud CLI

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • CHANGE_STREAM_NAME: Es el nombre del flujo de cambios del que se leerá.
  • INSTANCE_ID: El ID de la instancia de Cloud Spanner.
  • DATABASE_ID: El ID de la base de datos de Cloud Spanner.
  • SPANNER_PROJECT_ID: El ID del proyecto de Cloud Spanner.
  • METADATA_INSTANCE: Es la instancia de Cloud Spanner para almacenar metadatos cuando se lee de flujos de cambios.
  • METADATA_DATABASE: La base de datos de Cloud Spanner para almacenar metadatos cuando se lee de flujos de cambios
  • SOURCE_SHARDS_FILE_PATH: Es la ruta de acceso al archivo de GCS que contiene los detalles del fragmento de origen.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • CHANGE_STREAM_NAME: Es el nombre del flujo de cambios del que se leerá.
  • INSTANCE_ID: El ID de la instancia de Cloud Spanner.
  • DATABASE_ID: El ID de la base de datos de Cloud Spanner.
  • SPANNER_PROJECT_ID: El ID del proyecto de Cloud Spanner.
  • METADATA_INSTANCE: Es la instancia de Cloud Spanner para almacenar metadatos cuando se lee de flujos de cambios.
  • METADATA_DATABASE: La base de datos de Cloud Spanner para almacenar metadatos cuando se lee de flujos de cambios
  • SOURCE_SHARDS_FILE_PATH: Es la ruta de acceso al archivo de GCS que contiene los detalles del fragmento de origen.