Vorlage „Spanner-Änderungsstreams zu Pub/Sub“

Die Spanner-Änderungsstreams zur Pub/Sub-Vorlage sind eine Streaming-Pipeline, die Spanner-Datenänderungseinträge streamt und mit Dataflow Runner V2 in Pub/Sub-Themen schreibt.

Um Ihre Daten in ein neues Pub/Sub-Thema auszugeben, müssen Sie zuerst das Thema erstellen. Nach der Erstellung generiert Pub/Sub automatisch ein Abo und hängt es an das neue Thema an. Wenn Sie versuchen, Daten an ein nicht vorhandenes Pub/Sub-Thema auszugeben, löst die Dataflow-Pipeline eine Ausnahme aus und die Pipeline bleibt hängen, da sie kontinuierlich versucht, eine Verbindung herzustellen.

Wenn das erforderliche Pub/Sub-Thema bereits vorhanden ist, können Sie Daten zu diesem Thema ausgeben.

Weitere Informationen finden Sie unter Informationen zu Änderungsstreams, Verbindungen von Änderungsstreams mit Dataflow erstellen und Best Practices für Änderungsstreams.

Pipelineanforderungen

  • Die Spanner-Instanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Datenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Metadateninstanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Metadatendatenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Spanner-Änderungsstream muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Das Pub/Sub-Thema muss vorhanden sein, bevor die Pipeline ausgeführt wird.

Vorlagenparameter

Erforderliche Parameter

  • spannerInstanceId: Die Spanner-Instanz, aus der Änderungsstreams gelesen werden sollen.
  • spannerDatabase: Die Spanner-Datenbank, aus der Änderungsstreams gelesen werden sollen.
  • spannerMetadataInstanceId: Die Spanner-Instanz, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
  • spannerMetadataDatabase: Die Spanner-Datenbank, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
  • spannerChangeStreamName: Der Name des Spanner-Änderungsstreams, aus dem gelesen werden soll.
  • pubsubTopic: Pub/Sub-Thema für die Ausgabe der Änderungsstreams.

Optionale Parameter

  • spannerProjectId: Das Projekt, aus dem Änderungsstreams gelesen werden. In diesem Projekt wird auch die Metadatentabelle des Änderungsstream-Connectors erstellt. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
  • spannerDatabaseRole: Die Spanner-Datenbankrolle, die beim Ausführen der Vorlage verwendet werden soll. Dieser Parameter ist nur erforderlich, wenn das IAM-Hauptkonto, das die Vorlage ausführt, ein Nutzer für die Zugriffssteuerung ist. Die Datenbankrolle muss die Berechtigung SELECT für den Änderungsstream und die Berechtigung EXECUTE für die Lesefunktion des Änderungsstreams haben. Weitere Informationen finden Sie unter „Detaillierte Zugriffssteuerung für Änderungsstreams“ (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: Der Name der zu verwendenden Connector-Metadatentabelle für Spanner-Änderungsstreams. Wenn nicht angegeben, erstellt Spanner automatisch die Metadatentabelle des Stream-Connectors während der Änderung des Pipelineablaufs. Sie müssen diesen Parameter beim Aktualisieren einer vorhandenen Pipeline angeben. Verwenden Sie diesen Parameter nicht für andere Fälle.
  • startTimestamp: Die Start-DateTime (https://tools.ietf.org/html/rfc3339) (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Beispiel: ex- 2021-10-12T07:20:50.52Z. Die Standardeinstellung ist der Zeitstempel für den Start der Pipeline, d. h. die aktuelle Zeit.
  • endTimestamp: Die End-DateTime (https://tools.ietf.org/html/rfc3339) (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Beispiel: ex- 2021-10-12T07:20:50.52Z. Die Standardeinstellung ist eine unendliche Zeit in der Zukunft.
  • spannerHost: Der Cloud Spanner-Endpunkt, der in der Vorlage aufgerufen werden soll. Wird nur zum Testen verwendet. Beispiel: https://spanner.googleapis.com. Die Standardeinstellung ist https://spanner.googleapis.com.
  • outputDataFormat: Das Format der Ausgabe. Die Ausgabe wird in viele PubsubMessages zusammengefasst und an ein Pub/Sub-Thema gesendet. Zulässige Formate sind JSON und AVRO. Der Standardwert ist JSON.
  • pubsubAPI: Zur Implementierung der Pipeline verwendete Pub/Sub API. Zulässige APIs sind pubsubio und native_client. Bei einer geringen Anzahl von Abfragen pro Sekunde (QPS) hat native_client eine geringere Latenz. Bei einer großen Anzahl von Abfragen pro Sekunde bietet pubsubio eine bessere und stabilere Leistung. Der Standardwert ist pubsubio.
  • pubsubProjectId: Projekt des Pub/Sub-Themas. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
  • rpcPriority: Die Anfragepriorität für Spanner-Aufrufe. Zulässige Werte sind HIGH, MEDIUM und LOW. Die Standardeinstellung ist HIGH.
  • includeSpannerSource: Gibt an, ob die Spanner-Datenbank-ID und die Instanz-ID, aus denen der Änderungsstream gelesen werden soll, in den Ausgabenachrichtendaten enthalten sein sollen. Die Standardeinstellung ist "false".
  • outputMessageMetadata: Der Stringwert für das benutzerdefinierte Feld „outputMessageMetadata“ in der Pub/Sub-Nachricht. Standardmäßig ist dieses Feld leer. Das Feld „outputMessageMetadata“ wird nur ausgefüllt, wenn dieser Wert nicht leer ist. Geben Sie Sonderzeichen mit Escape-Zeichen ein, z. B. doppelte Anführungszeichen.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Spanner change streams to Pub/Sub templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Ersetzen Sie dabei Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: Spanner-Instanz-ID
  • SPANNER_DATABASE: Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: Spanner-Metadateninstanz-ID
  • SPANNER_METADATA_DATABASE: Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Spanner-Änderungsstream
  • PUBSUB_TOPIC: Pub/Sub-Thema für die Ausgabe der Änderungsstreams

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: Spanner-Instanz-ID
  • SPANNER_DATABASE: Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: Spanner-Metadateninstanz-ID
  • SPANNER_METADATA_DATABASE: Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Spanner-Änderungsstream
  • PUBSUB_TOPIC: Pub/Sub-Thema für die Ausgabe der Änderungsstreams
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreamsToPubSub;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToPubSub} pipeline streams change stream record(s) and stores to
 * pubsub topic in user specified format. The sink data can be stored in a JSON Text or Avro data
 * format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Pub/Sub",
    description = {
      "The Cloud Spanner change streams to the Pub/Sub template is a streaming pipeline that streams Cloud Spanner data change records and writes them into Pub/Sub topics using Dataflow Runner V2.\n",
      "To output your data to a new Pub/Sub topic, you need to first create the topic. After creation, Pub/Sub automatically generates and attaches a subscription to the new topic. "
          + "If you try to output data to a Pub/Sub topic that doesn't exist, the dataflow pipeline throws an exception, and the pipeline gets stuck as it continuously tries to make a connection.\n",
      "If the necessary Pub/Sub topic already exists, you can output data to that topic.",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToPubSubOptions.class,
    flexContainerName = "spanner-changestreams-to-pubsub",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist before running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Pub/Sub topic must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToPubSub {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToPubSub.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Messages to Pub/Sub");

    SpannerChangeStreamsToPubSubOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToPubSubOptions.class);

    run(options);
  }

  private static String getSpannerProjectId(SpannerChangeStreamsToPubSubOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  private static String getPubsubProjectId(SpannerChangeStreamsToPubSubOptions options) {
    return options.getPubsubProjectId().isEmpty()
        ? options.getProject()
        : options.getPubsubProjectId();
  }

  public static boolean isValidAsciiString(String outputMessageMetadata) {
    if (outputMessageMetadata != null
        && !StandardCharsets.US_ASCII.newEncoder().canEncode(outputMessageMetadata)) {
      return false;
    }
    return true;
  }

  public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) {
    LOG.info("Requested Message Format is " + options.getOutputDataFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);
    // Get the Spanner project, instance, database, metadata instance, metadata database
    // change stream, pubsub topic, and pubsub api parameters.
    String spannerProjectId = getSpannerProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();
    String pubsubProjectId = getPubsubProjectId(options);
    String pubsubTopicName = options.getPubsubTopic();
    String pubsubAPI = options.getPubsubAPI();
    Boolean includeSpannerSource = options.getIncludeSpannerSource();
    String outputMessageMetadata = options.getOutputMessageMetadata();

    // Ensure outputMessageMetadata only contains valid ascii characters
    if (!isValidAsciiString(outputMessageMetadata)) {
      throw new RuntimeException("outputMessageMetadata contains non ascii characters.");
    }

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(spannerProjectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Convert each record to a PubsubMessage",
            FileFormatFactorySpannerChangeStreamsToPubSub.newBuilder()
                .setOutputDataFormat(options.getOutputDataFormat())
                .setProjectId(pubsubProjectId)
                .setPubsubAPI(pubsubAPI)
                .setPubsubTopicName(pubsubTopicName)
                .setIncludeSpannerSource(includeSpannerSource)
                .setSpannerDatabaseId(databaseId)
                .setSpannerInstanceId(instanceId)
                .setOutputMessageMetadata(outputMessageMetadata)
                .build());
    return pipeline.run();
  }
}

Nächste Schritte