Plantilla de flujos de cambio de Spanner a Cloud Storage

La plantilla de flujos de cambios de Spanner a Cloud Storage es una canalización de transmisión que transmite los registros de cambios de datos de Spanner y los escribe en un bucket de Cloud Storage con Dataflow Runner v2.

La canalización agrupa los registros de flujos de cambios de Spanner en períodos según su marca de tiempo, y cada período representa un intervalo de tiempo cuya duración puedes configurar con esta plantilla. Se garantiza que todos los registros con marcas de tiempo que pertenecen al período se encuentran en el período; no puede haber retrasos. También puedes definir una cantidad de fragmentos de salida. La canalización crea un archivo de salida de Cloud Storage por período por fragmento. Dentro de un archivo de salida, los registros no están ordenados. Se pueden escribir los archivos de salida en formato JSON o AVRO, según la configuración del usuario.

Ten en cuenta que puedes minimizar la latencia de la red y los costos de transporte de la red si ejecutas el trabajo de Dataflow desde la misma región que tu instancia de Spanner o tu bucket de Cloud Storage. Si usas fuentes, receptores, ubicaciones de archivos de etapa de pruebas o ubicaciones de archivos temporales que se encuentran fuera de la región del trabajo, es posible que los datos se envíen a través de diferentes regiones. Obtén más información sobre las regiones de Dataflow.

Obtén más información sobre los flujos de cambios, cómo compilar canalizaciones de Dataflow de flujos de cambio y prácticas recomendadas.

Requisitos de la canalización

  • La instancia de Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de Spanner debe existir antes de ejecutar la canalización.
  • La instancia de metadatos de Spanner debe existir antes de ejecutar la canalización.
  • La base de datos de metadatos de Spanner debe existir antes de ejecutar la canalización.
  • El flujo de cambios de Spanner debe existir antes de ejecutar la canalización.
  • El bucket de salida de Cloud Storage debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetros obligatorios

  • spannerInstanceId: El ID de instancia de Spanner desde el que se leerán los datos de flujos de cambios.
  • spannerDatabase: La base de datos de Spanner desde la que se leerán los datos de flujos de cambios.
  • spannerMetadataInstanceId: El ID de instancia de Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
  • spannerMetadataDatabase: La base de datos de Spanner que se usará para la tabla de metadatos del conector de flujos de cambios.
  • spannerChangeStreamName: El nombre del flujo de cambios de Spanner desde el que se leerá.
  • gcsOutputDirectory: La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Debe terminar con una barra. El formato de fecha y hora se usa a fin de analizar la ruta del directorio para los formateadores de fecha y hora. Por ejemplo, gs://your-bucket/your-path

Parámetros opcionales

  • spannerProjectId: El ID del proyecto de Google Cloud que contiene la base de datos de Spanner desde la que se leerán los flujos de cambios. En este proyecto, también se crea la tabla de metadatos del conector de flujos de cambios. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
  • spannerDatabaseRole: El rol de base de datos de Spanner que se usará cuando se ejecute la plantilla. Este parámetro solo es necesario cuando el principal de IAM que ejecuta la plantilla es un usuario de control de acceso detallado. El rol de la base de datos debe tener el privilegio SELECT en la transmisión de cambios y el privilegio EXECUTE en la función de lectura de la transmisión de cambios. Para obtener más información, consulta Control de acceso detallado para los flujos de cambios (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: El nombre de la tabla de metadatos del conector de flujos de cambios de Spanner que se usará. Si no se proporciona, una tabla de metadatos de flujos de cambios de Spanner se crea de forma automática durante la ejecución de la canalización. Debes proporcionar un valor para este parámetro cuando actualices una canalización existente. De lo contrario, no uses este parámetro.
  • startTimestamp: La fecha y hora de inicio, inclusiva, que se usará para leer los flujos de cambios, en el formato Ex-2021-10-12T07:20:50.52Z. El valor predeterminado es la marca de tiempo del inicio de la canalización, es decir, la hora actual.
  • endTimestamp: El valor de DateTime de finalización, inclusivo, que se usará para leer los flujos de cambios. Por ejemplo, Ex-2021-10-12T07:20:50.52Z El valor predeterminado es un tiempo infinito en el futuro.
  • spannerHost: Es el extremo de Cloud Spanner al que se llamará en la plantilla. Solo se usa para pruebas. Por ejemplo, https://spanner.googleapis.com La configuración predeterminada es https://spanner.googleapis.com.
  • outputFileFormat: Es el formato del archivo de salida de Cloud Storage. Los formatos permitidos son TEXT y AVRO. La configuración predeterminada es AVRO.
  • windowDuration: La duración de la ventana es el intervalo en el que se escriben los datos en el directorio de salida. Configura la duración en función de la capacidad de procesamiento de la canalización. Por ejemplo, una capacidad de procesamiento mayor puede requerir tamaños de ventana más pequeños para que los datos se ajusten a la memoria. La configuración predeterminada es de 5 m (cinco minutos), con un mínimo de 1 s (un segundo). Los formatos permitidos son: [nro. entero] s (para los segundos, por ejemplo, 5 s), [nro. entero] min (para los minutos, por ejemplo, 12 min) y [nro. entero] h (para las horas, por ejemplo, 2 h). Por ejemplo, 5m
  • rpcPriority: La prioridad de solicitud para llamadas de Spanner. El valor debe ser HIGH, MEDIUM o LOW. El valor predeterminado es HIGH.
  • outputFilenamePrefix: Es el prefijo para colocar en cada archivo con ventanas. Por ejemplo, output- La configuración predeterminada es: output.
  • numShards: Es la cantidad máxima de fragmentos de salida que se produce con la escritura. Una mayor cantidad de fragmentos implica una mayor capacidad de procesamiento para la escritura en Cloud Storage, pero, también, un mayor costo de agregación de datos entre fragmentos cuando se procesan archivos de salida de Cloud Storage. La configuración predeterminada es 20.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Spanner change streams to Google Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Cloud Spanner
  • SPANNER_DATABASE: Base de datos de Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Cloud Spanner
  • SPANNER_METADATA_DATABASE: Base de datos de metadatos de Cloud Spanner
  • SPANNER_CHANGE_STREAM: Flujo de cambios de Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: Ubicación del archivo de salida de los flujos de cambios

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • SPANNER_INSTANCE_ID: ID de la instancia de Cloud Spanner
  • SPANNER_DATABASE: Base de datos de Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID de la instancia de metadatos de Cloud Spanner
  • SPANNER_METADATA_DATABASE: Base de datos de metadatos de Cloud Spanner
  • SPANNER_CHANGE_STREAM: Flujo de cambios de Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: Ubicación del archivo de salida de los flujos de cambios
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToGcsOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToGcs} pipeline streams change stream record(s) and stores to
 * Google Cloud Storage bucket in user specified format. The sink data can be stored in a Text or
 * Avro file format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_Google_Cloud_Storage.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_Google_Cloud_Storage",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Cloud Storage",
    description = {
      "The Cloud Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner V2.\n",
      "The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. "
          + "All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. "
          + "You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. "
          + "Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.\n",
      "Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Cloud Spanner instance or Cloud Storage bucket. "
          + "If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. "
          + "See more about <a href=\"https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\">Dataflow regional endpoints</a>.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToGcsOptions.class,
    flexContainerName = "spanner-changestreams-to-gcs",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Cloud Storage output bucket must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToGcs {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToGcs.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Files to GCS");

    SpannerChangeStreamsToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToGcsOptions.class);

    run(options);
  }

  private static String getProjectId(SpannerChangeStreamsToGcsOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) {
    LOG.info("Requested File Format is " + options.getOutputFileFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);

    // Get the Spanner project, instance, database, and change stream parameters.
    String projectId = getProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(projectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      LOG.info("Setting database role on SpannerConfig: " + options.getSpannerDatabaseRole());
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    LOG.info("Created SpannerConfig: " + spannerConfig);
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Creating " + options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
        .apply(
            "Write To GCS",
            FileFormatFactorySpannerChangeStreams.newBuilder().setOptions(options).build());

    return pipeline.run();
  }
}

¿Qué sigue?