Plantilla de Datastream a Spanner

La plantilla de Datastream a Spanner es un flujo de procesamiento en streaming que lee eventos de Datastream de un segmento de Cloud Storage y los escribe en una base de datos de Spanner. Está diseñada para migrar datos de fuentes de Datastream a Spanner. Especifica el parámetro gcsPubSubSubscription para leer datos de las notificaciones de Pub/Sub O proporciona el parámetro inputFilePattern para leer datos directamente de los archivos de Cloud Storage.

Todas las tablas necesarias para la migración deben existir en la base de datos de Spanner de destino antes de ejecutar la plantilla. Por lo tanto, la migración del esquema de una base de datos de origen a Spanner de destino debe completarse antes de la migración de datos. Los datos pueden estar en las tablas antes de la migración. Esta plantilla no propaga los cambios de esquema de Datastream a la base de datos de Spanner.

La coherencia de los datos solo se garantiza al final de la migración, cuando todos los datos se hayan escrito en Spanner. Para almacenar la información de orden de cada registro escrito en Spanner, esta plantilla crea una tabla adicional (denominada tabla de sombra) para cada tabla de la base de datos de Spanner. Se usa para asegurar la coherencia al final de la migración. Las tablas de sombra no se eliminan después de la migración y se pueden usar para validar al final de la migración.

Todos los errores que se produzcan durante el funcionamiento (por ejemplo, esquemas que no coinciden, archivos JSON que no son válidos o errores derivados de la ejecución de transformaciones) se registran en una cola de errores. La cola de errores es una carpeta de Cloud Storage que almacena todos los eventos de Datastream que han tenido errores, junto con el motivo del error en formato de texto. Los errores pueden ser transitorios o permanentes y se almacenan en las carpetas de Cloud Storage correspondientes de la cola de errores. Los errores transitorios se vuelven a intentar automáticamente, mientras que los permanentes no. En caso de errores permanentes, tienes la opción de corregir los eventos de cambio y moverlos al contenedor de reintentos mientras se ejecuta la plantilla.

Requisitos del flujo de procesamiento

  • Una secuencia de Datastream en estado En ejecución o No iniciada.
  • Un segmento de Cloud Storage en el que se replican los eventos de Datastream.
  • Una base de datos de Spanner con tablas. Estas tablas pueden estar vacías o contener datos.

Parámetros de plantilla

Parámetros obligatorios

  • instanceId la instancia de Spanner en la que se replican los cambios.
  • databaseId la base de datos de Spanner en la que se replican los cambios.

Parámetros opcionales

  • inputFilePattern la ubicación del archivo de Cloud Storage que contiene los archivos de Datastream que se van a replicar. Normalmente, se trata de la ruta raíz de un flujo. Se ha inhabilitado la compatibilidad con esta función. Utiliza esta función solo para reintentar las entradas que lleguen a la cola de mensajes fallidos graves.
  • inputFileFormat formato del archivo de salida generado por Datastream. Por ejemplo, avro,json. El valor predeterminado es avro.
  • sessionFilePath ruta del archivo de sesión en Cloud Storage que contiene información de asignación de HarbourBridge.
  • projectId el ID del proyecto de Spanner.
  • spannerHost el endpoint de Cloud Spanner al que se llama en la plantilla. Por ejemplo, https://batch-spanner.googleapis.com. El valor predeterminado es https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription la suscripción de Pub/Sub que se usa en una política de notificaciones de Cloud Storage. En el nombre, usa el formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName el nombre o la plantilla de la emisión de la que se va a obtener información del esquema y el tipo de fuente.
  • shadowTablePrefix prefijo que se usa para asignar nombres a las tablas de sombra. Valor predeterminado: shadow_.
  • shouldCreateShadowTables esta marca indica si se deben crear tablas de sombra en la base de datos de Cloud Spanner. El valor predeterminado es true.
  • rfcStartDateTime la fecha y hora de inicio que se usa para obtener datos de Cloud Storage (https://tools.ietf.org/html/rfc3339). El valor predeterminado es 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency número de archivos de DataStream simultáneos que se van a leer. Valor predeterminado: 30.
  • deadLetterQueueDirectory la ruta de archivo que se usa para almacenar la salida de la cola de errores. La ruta de archivo predeterminada es un directorio de la ubicación temporal de la tarea de Dataflow.
  • dlqRetryMinutes número de minutos entre reintentos de la cola de mensajes fallidos. El valor predeterminado es 10.
  • dlqMaxRetryCount número máximo de veces que se pueden reintentar los errores temporales a través de DLQ. El valor predeterminado es 500.
  • dataStreamRootUrl URL raíz de la API Datastream. El valor predeterminado es https://datastream.googleapis.com/.
  • datastreamSourceType es el tipo de base de datos de origen a la que se conecta Datastream. Ejemplo: mysql/oracle. Se debe definir al hacer pruebas sin un flujo de datos en ejecución.
  • roundJsonDecimals si se define esta marca, se redondean los valores decimales de las columnas JSON a un número que se pueda almacenar sin pérdida de precisión. Valor predeterminado: false.
  • runMode: es el tipo de modo de ejecución, ya sea normal o con retryDLQ. El valor predeterminado es regular.
  • transformationContextFilePath ruta del archivo de contexto de transformación en Cloud Storage que se usa para rellenar los datos que se utilizan en las transformaciones realizadas durante las migraciones. Por ejemplo, el ID de fragmento al nombre de la base de datos para identificar la base de datos desde la que se migró una fila.
  • directoryWatchDurationInMinutes duración durante la que la canalización debe seguir sondeando un directorio en GCS. Los archivos de salida de Datastream se organizan en una estructura de directorios que muestra la marca de tiempo del evento agrupada por minutos. Este parámetro debe ser aproximadamente igual al retraso máximo que podría producirse entre el evento que se produce en la base de datos de origen y el mismo evento que escribe Datastream en GCS. Percentil 99,9 = 10 minutos. El valor predeterminado es 10.
  • spannerPriority la prioridad de la solicitud para las llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH,MEDIUM,LOW]. El valor predeterminado es HIGH.
  • dlqGcsPubSubSubscription la suscripción de Pub/Sub que se usa en una política de notificaciones de Cloud Storage para el directorio de reintentos de DLQ cuando se ejecuta en modo normal. En el nombre, usa el formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Si se define, se ignoran deadLetterQueueDirectory y dlqRetryMinutes.
  • transformationJarPath: ubicación del archivo JAR personalizado en Cloud Storage para el archivo que contiene la lógica de transformación personalizada para procesar registros en la migración directa. El valor predeterminado es una cadena vacía.
  • transformationClassName nombre de clase completo que tiene la lógica de transformación personalizada. Es un campo obligatorio si se especifica transformationJarPath. El valor predeterminado es una cadena vacía.
  • transformationCustomParameters cadena que contiene los parámetros personalizados que se van a transferir a la clase de transformación personalizada. El valor predeterminado es una cadena vacía.
  • filteredEventsDirectory es la ruta del archivo donde se almacenan los eventos filtrados mediante una transformación personalizada. El valor predeterminado es un directorio de la ubicación temporal de la tarea de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
  • shardingContextFilePath la ruta del archivo de contexto de partición en Cloud Storage se usa para rellenar el ID de partición en la base de datos de Spanner de cada partición de origen.Tiene el formato Map<stream_name, Map<db_name, shard_id>>.
  • tableOverrides son las sustituciones de nombres de tabla de origen a Spanner. Se escriben con el siguiente formato: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]. En este ejemplo, se asigna la tabla Singers a Vocalists y la tabla Albums a Records. Por ejemplo, [{Singers, Vocalists}, {Albums, Records}]. El valor predeterminado es una cadena vacía.
  • columnOverrides son las sustituciones de nombres de columnas de origen a Spanner. Se escriben con el siguiente formato: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]. Ten en cuenta que SourceTableName debe ser el mismo en el par de origen y en el de Spanner. Para anular los nombres de las tablas, usa tableOverrides.En el ejemplo se muestra cómo asignar SingerName a TalentName y AlbumName a RecordName en las tablas Singers y Albums, respectivamente. Por ejemplo, [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]. El valor predeterminado es una cadena vacía.
  • schemaOverridesFilePath: un archivo que especifica la tabla y los nombres de columna que se van a sustituir de la fuente a Spanner. El valor predeterminado es una cadena vacía.
  • shadowTableSpannerDatabaseId base de datos independiente opcional para las tablas de sombra. Si no se especifica, las tablas de sombra se crearán en la base de datos principal. Si se especifica, asegúrate de que también se especifique shadowTableSpannerInstanceId. El valor predeterminado es una cadena vacía.
  • shadowTableSpannerInstanceId instancia independiente opcional para las tablas de sombra. Si no se especifica ninguna, las tablas de sombra se crearán en la instancia principal. Si se especifica, asegúrate de que también se especifique shadowTableSpannerDatabaseId. El valor predeterminado es una cadena vacía.
  • failureInjectionParameter parámetro de inyección de fallos. Solo se usa para hacer pruebas. El valor predeterminado es una cadena vacía.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Cloud Datastream to Spanner template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • GCS_FILE_PATH: la ruta de Cloud Storage que se usa para almacenar eventos de flujo de datos. Por ejemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: tu instancia de Spanner.
  • CLOUDSPANNER_DATABASE: tu base de datos de Spanner.
  • DLQ: la ruta de Cloud Storage del directorio de la cola de errores.

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • GCS_FILE_PATH: la ruta de Cloud Storage que se usa para almacenar eventos de flujo de datos. Por ejemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: tu instancia de Spanner.
  • CLOUDSPANNER_DATABASE: tu base de datos de Spanner.
  • DLQ: la ruta de Cloud Storage del directorio de la cola de errores.

Siguientes pasos