Modèle de flux de modifications de Spanner vers Pub/Sub

Les flux de modification Spanner vers le modèle Pub/Sub sont un pipeline de streaming qui diffuse les enregistrements de modification des données Spanner et les écrit dans des sujets Pub/Sub à l'aide de l'exécuteur Dataflow V2.

Pour générer vos données dans un nouveau sujet Pub/Sub, vous devez d'abord créer le sujet. Après la création, Pub/Sub génère et associe automatiquement un abonnement au nouveau sujet. Si vous essayez de générer des données dans un sujet Pub/Sub qui n'existe pas, le pipeline Dataflow génère une exception, et le pipeline se bloque car il tente en boucle d'établir une connexion.

Si le sujet Pub/Sub nécessaire existe déjà, vous pouvez générer des données dans ce sujet.

Pour en savoir plus, consultez les pages À propos des flux de modifications, Créer des connexions de flux de modification avec Dataflow et Bonnes pratiques concernant les flux de modifications.

Conditions requises pour ce pipeline

  • L'instance Spanner doit exister avant l'exécution du pipeline.
  • La base de données Spanner doit exister avant l'exécution du pipeline.
  • L'instance de métadonnées Spanner doit exister avant l'exécution du pipeline.
  • La base de données de métadonnées Spanner doit exister avant l'exécution du pipeline.
  • Le flux de modifications Spanner doit exister avant l'exécution du pipeline.
  • Le sujet Pub/Sub doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres obligatoires

  • spannerInstanceId: Instance Spanner à partir de laquelle lire les flux de modifications.
  • spannerDatabase: base de données Spanner à partir de laquelle lire les flux de modifications.
  • spannerMetadataInstanceId: instance Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification.
  • spannerMetadataDatabase: base de données Spanner à utiliser pour la table de métadonnées du connecteur de flux de modification.
  • spannerChangeStreamName: nom du flux de modifications Spanner à lire.
  • pubsubTopic: sujet Pub/Sub dans lequel inscrire le résultat des flux de modifications.

Paramètres facultatifs

  • spannerProjectId: projet à partir duquel lire les flux de modifications. Il s'agit également du projet dans lequel la table de métadonnées du connecteur de flux de modifications est créée. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.
  • spannerDatabaseRole: rôle de base de données Spanner à utiliser lors de l'exécution du modèle. Ce paramètre n'est requis que lorsque le compte principal IAM qui exécute le modèle est un utilisateur de contrôle d'accès précis. Le rôle de base de données doit disposer du droit SELECT sur le flux de modifications et du droit EXECUTE sur la fonction de lecture du flux de modifications. Pour en savoir plus, consultez la section "Contrôle des accès précis pour les flux de modifications" (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: Nom de la table des métadonnées du connecteur de flux de modifications Cloud Spanner à utiliser. Si aucune valeur n'est fournie, Spanner crée automatiquement la table de métadonnées du connecteur de flux lors de la modification du flux du pipeline. Vous devez fournir ce paramètre lorsque vous mettez à jour un pipeline existant. N'utilisez pas ce paramètre dans d'autres cas.
  • startTimestamp: date et heure de début (incluses) (https://tools.ietf.org/html/rfc3339) à utiliser pour lire les flux de modifications. Par exemple : ex-2021-10-12T07:20:50.52Z. La valeur par défaut est l'horodatage du démarrage du pipeline, c'est-à-dire l'heure actuelle.
  • endTimestamp: date et heure de fin (incluses) (https://tools.ietf.org/html/rfc3339) à utiliser pour lire les flux de modifications. Par exemple : ex-2021-10-12T07:20:50.52Z. Elle est définie par défaut sur une période infinie dans le futur.
  • spannerHost: point de terminaison Cloud Spanner à appeler dans le modèle. Utilisé uniquement pour les tests. Exemple :https://spanner.googleapis.com La valeur par défaut est https://spanner.googleapis.com.
  • outputDataFormat: format de la sortie. Le résultat est encapsulé dans de nombreux messages PubsubMessages et envoyé à un sujet Pub/Sub. Les formats autorisés sont JSON et AVRO. La valeur par défaut est JSON.
  • pubsubAPI: API Pub/Sub utilisée pour mettre en œuvre le pipeline. Les API autorisées sont pubsubio et native_client. Pour un petit nombre de requêtes par seconde (RPS), native_client a moins de latence. Pour un grand nombre de RPS, pubsubio fournit des performances supérieures et plus stables. La valeur par défaut est pubsubio.
  • pubsubProjectId: projet du sujet Pub/Sub. La valeur par défaut de ce paramètre correspond au projet dans lequel le pipeline Dataflow est exécuté.
  • rpcPriority: priorité des requêtes pour les appels Spanner. Les valeurs autorisées sont "HIGH", "MEDIUM" et "LOW". La valeur par défaut est "HIGH".
  • includeSpannerSource: indique si l'ID de la base de données Spanner et l'ID d'instance à partir desquels lire le flux de modifications doivent être inclus dans les données du message de sortie. La valeur par défaut est "false".
  • outputMessageMetadata: valeur de chaîne pour le champ personnalisé outputMessageMetadata dans le message Pub/Sub de sortie. La valeur par défaut est vide, et le champ "outputMessageMetadata" n'est renseigné que si cette valeur n'est pas vide. Veuillez échapper les caractères spéciaux lorsque vous saisissez la valeur ici(par exemple, les guillemets doubles).

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Spanner change streams to Pub/Sub template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_INSTANCE_ID : ID de l'instance Spanner
  • SPANNER_DATABASE: Base de données Spanner
  • SPANNER_METADATA_INSTANCE_ID : ID d'instance de métadonnées Spanner
  • SPANNER_METADATA_DATABASE : base de données de métadonnées Spanner
  • SPANNER_CHANGE_STREAM : flux de modifications Spanner
  • PUBSUB_TOPIC : sujet Pub/Sub dans lequel inscrire le résultat des flux de modifications

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_INSTANCE_ID : ID de l'instance Spanner
  • SPANNER_DATABASE: Base de données Spanner
  • SPANNER_METADATA_INSTANCE_ID : ID d'instance de métadonnées Spanner
  • SPANNER_METADATA_DATABASE : base de données de métadonnées Spanner
  • SPANNER_CHANGE_STREAM : flux de modifications Spanner
  • PUBSUB_TOPIC : sujet Pub/Sub dans lequel inscrire le résultat des flux de modifications
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreamsToPubSub;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToPubSub} pipeline streams change stream record(s) and stores to
 * pubsub topic in user specified format. The sink data can be stored in a JSON Text or Avro data
 * format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Pub/Sub",
    description = {
      "The Cloud Spanner change streams to the Pub/Sub template is a streaming pipeline that streams Cloud Spanner data change records and writes them into Pub/Sub topics using Dataflow Runner V2.\n",
      "To output your data to a new Pub/Sub topic, you need to first create the topic. After creation, Pub/Sub automatically generates and attaches a subscription to the new topic. "
          + "If you try to output data to a Pub/Sub topic that doesn't exist, the dataflow pipeline throws an exception, and the pipeline gets stuck as it continuously tries to make a connection.\n",
      "If the necessary Pub/Sub topic already exists, you can output data to that topic.",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToPubSubOptions.class,
    flexContainerName = "spanner-changestreams-to-pubsub",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist before running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Pub/Sub topic must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToPubSub {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToPubSub.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Messages to Pub/Sub");

    SpannerChangeStreamsToPubSubOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToPubSubOptions.class);

    run(options);
  }

  private static String getSpannerProjectId(SpannerChangeStreamsToPubSubOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  private static String getPubsubProjectId(SpannerChangeStreamsToPubSubOptions options) {
    return options.getPubsubProjectId().isEmpty()
        ? options.getProject()
        : options.getPubsubProjectId();
  }

  public static boolean isValidAsciiString(String outputMessageMetadata) {
    if (outputMessageMetadata != null
        && !StandardCharsets.US_ASCII.newEncoder().canEncode(outputMessageMetadata)) {
      return false;
    }
    return true;
  }

  public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) {
    LOG.info("Requested Message Format is " + options.getOutputDataFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);
    // Get the Spanner project, instance, database, metadata instance, metadata database
    // change stream, pubsub topic, and pubsub api parameters.
    String spannerProjectId = getSpannerProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();
    String pubsubProjectId = getPubsubProjectId(options);
    String pubsubTopicName = options.getPubsubTopic();
    String pubsubAPI = options.getPubsubAPI();
    Boolean includeSpannerSource = options.getIncludeSpannerSource();
    String outputMessageMetadata = options.getOutputMessageMetadata();

    // Ensure outputMessageMetadata only contains valid ascii characters
    if (!isValidAsciiString(outputMessageMetadata)) {
      throw new RuntimeException("outputMessageMetadata contains non ascii characters.");
    }

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(spannerProjectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Convert each record to a PubsubMessage",
            FileFormatFactorySpannerChangeStreamsToPubSub.newBuilder()
                .setOutputDataFormat(options.getOutputDataFormat())
                .setProjectId(pubsubProjectId)
                .setPubsubAPI(pubsubAPI)
                .setPubsubTopicName(pubsubTopicName)
                .setIncludeSpannerSource(includeSpannerSource)
                .setSpannerDatabaseId(databaseId)
                .setSpannerInstanceId(instanceId)
                .setOutputMessageMetadata(outputMessageMetadata)
                .build());
    return pipeline.run();
  }
}

Étape suivante