Modèle de flux de modifications Spanner vers Cloud Storage

Le modèle de flux de modification Spanner vers Cloud Storage est un pipeline de streaming qui diffuse les enregistrements de modification des données Spanner et les écrit dans un bucket Cloud Storage à l'aide de l'exécuteur Dataflow v2.

Le pipeline regroupe les enregistrements de flux de modification dans des fenêtres en fonction de leur horodatage, chaque fenêtre représentant une durée dont vous pouvez configurer la durée avec ce modèle. Tous les enregistrements dont les horodatages appartiennent à la fenêtre sont garantis dans la fenêtre. Il ne peut pas y avoir d'arrivées tardives. Vous pouvez également définir un certain nombre de partitions de sortie. Le pipeline crée un fichier de sortie Cloud Storage par fenêtre et par partition. Dans un fichier de sortie, les enregistrements ne sont pas ordonnés. Les fichiers de sortie peuvent être au format JSON ou AVRO, en fonction de la configuration utilisateur.

Notez que vous pouvez réduire la latence du réseau et les coûts de transport réseau en exécutant la tâche Dataflow à partir de la même région que votre instance Spanner ou bucket Cloud Storage. Si vous utilisez des sources, des récepteurs, des emplacements de fichiers de préproduction ou des emplacements de fichiers temporaires situés en dehors de la région associée à votre tâche, vos données peuvent être envoyées d'une région à l'autre. En savoir plus sur les régions Dataflow.

En savoir plus sur les flux de modification, la création de pipelines Dataflow de flux de modification et les bonnes pratiques.

Conditions requises pour ce pipeline

  • L'instance Spanner doit exister avant l'exécution du pipeline.
  • La base de données Spanner doit exister avant l'exécution du pipeline.
  • L'instance de métadonnées Spanner doit exister avant l'exécution du pipeline.
  • La base de données de métadonnées Spanner doit exister avant l'exécution du pipeline.
  • Le flux de modifications Spanner doit exister avant l'exécution du pipeline.
  • Le bucket de sortie Cloud Storage doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres obligatoires

  • spannerInstanceId: ID d'instance Spanner à partir duquel lire les flux de modifications.
  • spannerDatabase: base de données Spanner à partir de laquelle lire les données des flux de modifications.
  • spannerMetadataInstanceId: ID d'instance Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification.
  • spannerMetadataDatabase: base de données Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification.
  • spannerChangeStreamName: nom du flux de modifications Spanner à lire.
  • gcsOutputDirectory: chemin d'accès et préfixe de nom de fichier pour l'écriture des fichiers de sortie. Doit se terminer par une barre oblique. Le format DateTime permet d'analyser le chemin d'accès au répertoire pour les outils de mise en forme de date et d'heure. Exemple :gs://your-bucket/your-path

Paramètres facultatifs

  • spannerProjectId: ID du projet Google Cloud qui contient la base de données Spanner à partir de laquelle lire les flux de modifications. Il s'agit également du projet dans lequel la table de métadonnées du connecteur de flux de modifications est créée. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.
  • spannerDatabaseRole: rôle de base de données Spanner à utiliser lors de l'exécution du modèle. Ce paramètre n'est requis que lorsque le compte principal IAM qui exécute le modèle est un utilisateur de contrôle d'accès précis. Le rôle de base de données doit disposer du droit SELECT sur le flux de modifications et du droit EXECUTE sur la fonction de lecture du flux de modifications. Pour en savoir plus, consultez la section "Contrôle des accès précis pour les flux de modifications" (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: Nom de la table des métadonnées du connecteur de flux de modifications Cloud Spanner à utiliser. Si aucune valeur n'est fournie, une table de métadonnées de flux de modifications Spanner est automatiquement créée pendant l'exécution du pipeline. Vous devez fournir une valeur pour ce paramètre lorsque vous mettez à jour un pipeline existant. Sinon, n'utilisez pas ce paramètre.
  • startTimestamp: date et heure de début (incluses) à utiliser pour lire les flux de modifications, au format Ex-2021-10-12T07:20:50.52Z. La valeur par défaut est l'horodatage du démarrage du pipeline, c'est-à-dire l'heure actuelle.
  • endTimestamp: date et heure de fin (incluses) à utiliser pour lire les flux de modifications. Exemple :Ex-2021-10-12T07:20:50.52Z Elle est définie par défaut sur une période infinie dans le futur.
  • spannerHost: point de terminaison Cloud Spanner à appeler dans le modèle. Utilisé uniquement pour les tests. Exemple :https://spanner.googleapis.com La valeur par défaut est https://spanner.googleapis.com.
  • outputFileFormat: format du fichier Cloud Storage de sortie. Les formats autorisés sont TEXT et AVRO. La valeur par défaut est AVRO.
  • windowDuration: la durée de la fenêtre correspond à l'intervalle au cours duquel les données sont écrites dans le répertoire de sortie. Configurez la durée en fonction du débit du pipeline. Par exemple, un débit plus élevé peut nécessiter des tailles de fenêtre plus petites pour que les données s'intègrent à la mémoire. La valeur par défaut est "5m" (cinq minutes), avec une durée minimale de 1s (une seconde). Les formats autorisés sont les suivants : [int]s (pour les secondes, exemple : 5s), [int]m (pour les minutes, exemple : 12m), [int]h (pour les heures, exemple : 2h). Exemple :5m
  • rpcPriority: priorité des requêtes pour les appels Spanner. La valeur doit être HIGH, MEDIUM ou LOW. La valeur par défaut est HIGH.
  • outputFilenamePrefix: préfixe à placer sur chaque fichier ciblé sur une fenêtre. Exemple :output- La valeur par défaut est : "output".
  • numShards: nombre maximal de partitions de sortie générées lors de l'écriture. Un nombre plus élevé de segments entraîne un débit plus élevé pour l'écriture dans Cloud Storage, mais potentiellement un coût d'agrégation de données plus élevé entre les partitions lors du traitement des fichiers Cloud Storage de sortie. La valeur par défaut est 20.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Spanner change streams to Google Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_INSTANCE_ID : ID de l'instance Cloud Spanner
  • SPANNER_DATABASE : base de données Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID : ID d'instance de métadonnées Cloud Spanner
  • SPANNER_METADATA_DATABASE : base de données de métadonnées Cloud Spanner
  • SPANNER_CHANGE_STREAM : flux de modifications Cloud Spanner
  • GCS_OUTPUT_DIRECTORY : emplacement du fichier de sortie des flux de modifications

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_INSTANCE_ID : ID de l'instance Cloud Spanner
  • SPANNER_DATABASE : base de données Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID : ID d'instance de métadonnées Cloud Spanner
  • SPANNER_METADATA_DATABASE : base de données de métadonnées Cloud Spanner
  • SPANNER_CHANGE_STREAM : flux de modifications Cloud Spanner
  • GCS_OUTPUT_DIRECTORY : emplacement du fichier de sortie des flux de modifications
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToGcsOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToGcs} pipeline streams change stream record(s) and stores to
 * Google Cloud Storage bucket in user specified format. The sink data can be stored in a Text or
 * Avro file format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_Google_Cloud_Storage.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_Google_Cloud_Storage",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Cloud Storage",
    description = {
      "The Cloud Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner V2.\n",
      "The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. "
          + "All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. "
          + "You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. "
          + "Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.\n",
      "Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Cloud Spanner instance or Cloud Storage bucket. "
          + "If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. "
          + "See more about <a href=\"https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\">Dataflow regional endpoints</a>.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToGcsOptions.class,
    flexContainerName = "spanner-changestreams-to-gcs",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Cloud Storage output bucket must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToGcs {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToGcs.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Files to GCS");

    SpannerChangeStreamsToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToGcsOptions.class);

    run(options);
  }

  private static String getProjectId(SpannerChangeStreamsToGcsOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) {
    LOG.info("Requested File Format is " + options.getOutputFileFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);

    // Get the Spanner project, instance, database, and change stream parameters.
    String projectId = getProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(projectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      LOG.info("Setting database role on SpannerConfig: " + options.getSpannerDatabaseRole());
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    LOG.info("Created SpannerConfig: " + spannerConfig);
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Creating " + options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
        .apply(
            "Write To GCS",
            FileFormatFactorySpannerChangeStreams.newBuilder().setOptions(options).build());

    return pipeline.run();
  }
}

Étape suivante