Vorlage „Spanner-Änderungsstreams für Cloud Storage“

Die Vorlage „Spanner-Änderungsstreams für Cloud Storage“ ist eine Streamingpipeline, die Spanner-Datenänderungsdatensätze streamt und sie mit Dataflow Runner V2 in einen Cloud Storage-Bucket schreibt.

Die Pipeline gruppiert Spanner-Streamdatensätze anhand ihres Zeitstempels, wobei jedes Fenster eine Zeitdauer darstellt, deren Länge Sie mit dieser Vorlage konfigurieren können. Alle Datensätze mit Zeitstempeln, die zum Fenster gehören, befinden sich auch wirklich im Fenster. Es können keine verspäteten Ansagen vorhanden sein. Sie können auch mehrere Ausgabe-Shards definieren. Die Pipeline erstellt eine Cloud Storage-Ausgabedatei pro Fenster und Shard. Innerhalb einer Ausgabedatei sind die Datensätze ungeordnet. Ausgabedateien können je nach Nutzerkonfiguration im JSON- oder AVRO-Format geschrieben werden.

Beachten Sie, dass Sie die Netzwerklatenz und die Netzwerktransportkosten minimieren können. Führen Sie dazu den Dataflow-Job in derselben Region aus, in der sich auch Ihre Spanner-Instanz oder Ihr Cloud Storage-Bucket befindet. Wenn Sie Quellen und Senken sowie Speicherorte für Staging-Dateien und temporäre Dateien verwenden, die sich außerhalb der Region Ihres Jobs befinden, werden Ihre Daten möglicherweise regionenübergreifend gesendet. Weitere Informationen finden Sie unter Dataflow-Regionen.

Weitere Informationen zu Änderungsstreams, zum Erstellen von Dataflow-Pipelines für Änderungsstreams und Best Practices

Pipelineanforderungen

  • Die Spanner-Instanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Datenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Metadateninstanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Spanner-Metadatendatenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Spanner-Änderungsstream muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Cloud Storage-Ausgabe-Bucket muss vorhanden sein, bevor Sie die Pipeline ausführen.

Vorlagenparameter

Erforderliche Parameter

  • spannerInstanceId: Die Spanner-Instanz-ID, aus der Änderungsstreamdaten gelesen werden.
  • spannerDatabase: Die Spanner-Datenbank, aus der Änderungsstreamdaten gelesen werden sollen.
  • spannerMetadataInstanceId: Die Spanner-Instanz-ID, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
  • spannerMetadataDatabase: Die Spanner-Datenbank, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
  • spannerChangeStreamName: Der Name des Spanner-Änderungsstreams, aus dem gelesen werden soll.
  • gcsOutputDirectory: Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Muss mit einem Schrägstrich enden. Die DateTime-Formatierung wird verwendet, um den Verzeichnispfad für Datums- und Uhrzeitformatierer zu parsen. Beispiel: gs://your-bucket/your-path.

Optionale Parameter

  • spannerProjectId: Die ID des Google Cloud-Projekts, das die Spanner-Datenbank enthält, aus der Änderungsstreams gelesen werden sollen. In diesem Projekt wird auch die Metadatentabelle des Änderungsstream-Connectors erstellt. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
  • spannerDatabaseRole: Die Spanner-Datenbankrolle, die beim Ausführen der Vorlage verwendet werden soll. Dieser Parameter ist nur erforderlich, wenn das IAM-Hauptkonto, das die Vorlage ausführt, ein Nutzer für die Zugriffssteuerung ist. Die Datenbankrolle muss die Berechtigung SELECT für den Änderungsstream und die Berechtigung EXECUTE für die Lesefunktion des Änderungsstreams haben. Weitere Informationen finden Sie unter „Detaillierte Zugriffssteuerung für Änderungsstreams“ (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: Der Name der zu verwendenden Connector-Metadatentabelle für Spanner-Änderungsstreams. Wenn nicht angegeben, wird während der Pipelineausführung automatisch eine Metadatentabelle für Spanner-Änderungsstreams erstellt. Sie müssen einen Wert für diesen Parameter angeben, wenn Sie eine vorhandene Pipeline aktualisieren. Verwenden Sie andernfalls diesen Parameter nicht.
  • startTimestamp: Die Start-DateTime (einschließlich), die zum Lesen von Änderungsstreams im Format Ex-2021-10-12T07:20:50.52Z verwendet wird. Die Standardeinstellung ist der Zeitstempel für den Start der Pipeline, d. h. die aktuelle Zeit.
  • endTimestamp: Die End-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Beispiel: Ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist eine unendliche Zeit in der Zukunft.
  • spannerHost: Der Cloud Spanner-Endpunkt, der in der Vorlage aufgerufen werden soll. Wird nur zum Testen verwendet. Beispiel: https://spanner.googleapis.com. Die Standardeinstellung ist https://spanner.googleapis.com.
  • outputFileFormat: Das Format der Cloud Storage-Ausgabedatei. Zulässige Formate sind TEXT und AVRO. Die Standardeinstellung ist AVRO.
  • windowDuration: Die Fensterdauer ist das Intervall, in dem Daten in das Ausgabeverzeichnis geschrieben werden. Konfigurieren Sie die Dauer basierend auf dem Durchsatz der Pipeline. Bei einem höheren Durchsatz können beispielsweise kleinere Fenstergrößen erforderlich sein, damit die Daten in den Speicher passen. Die Standardeinstellung ist „5m“ (fünf Minuten), mindestens 1 Sekunde (eine Sekunde). Zulässige Formate sind: [int]s (für Sekunden, Beispiel: 5s), [int]m (für Minuten, Beispiel: 12m), [int]h (für Stunden, Beispiel: 2h). Beispiel: 5m.
  • rpcPriority: Die Anfragepriorität für Spanner-Aufrufe. Der Wert muss HIGH, MEDIUM oder LOW sein. Die Standardeinstellung ist HIGH.
  • outputFilenamePrefix: Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. Beispiel: output-. Die Standardeinstellung ist "output".
  • numShards: Die maximale Anzahl von Ausgabe-Shards, die beim Schreiben erzeugt werden. Eine höhere Anzahl von Shards erhöht den Durchsatz für das Schreiben in Cloud Storage, aber möglicherweise auch höhere Kosten für die Datenaggregation über Shards bei der Verarbeitung von Cloud Storage-Ausgabedateien. Die Standardeinstellung ist 20.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Spanner change streams to Google Cloud Storage templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Ersetzen Sie dabei Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: ID der Cloud Spanner-Instanz
  • SPANNER_DATABASE: Cloud Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: ID der Cloud Spanner-Metadateninstanz
  • SPANNER_METADATA_DATABASE: Cloud Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Cloud Spanner-Änderungsstream
  • GCS_OUTPUT_DIRECTORY: Dateispeicherort für die Ausgabe der Änderungsstreams

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: ID der Cloud Spanner-Instanz
  • SPANNER_DATABASE: Cloud Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: ID der Cloud Spanner-Metadateninstanz
  • SPANNER_METADATA_DATABASE: Cloud Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Cloud Spanner-Änderungsstream
  • GCS_OUTPUT_DIRECTORY: Dateispeicherort für die Ausgabe der Änderungsstreams
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToGcsOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToGcs} pipeline streams change stream record(s) and stores to
 * Google Cloud Storage bucket in user specified format. The sink data can be stored in a Text or
 * Avro file format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_Google_Cloud_Storage.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_Google_Cloud_Storage",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Cloud Storage",
    description = {
      "The Cloud Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner V2.\n",
      "The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. "
          + "All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. "
          + "You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. "
          + "Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.\n",
      "Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Cloud Spanner instance or Cloud Storage bucket. "
          + "If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. "
          + "See more about <a href=\"https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\">Dataflow regional endpoints</a>.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToGcsOptions.class,
    flexContainerName = "spanner-changestreams-to-gcs",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Cloud Storage output bucket must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToGcs {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToGcs.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Files to GCS");

    SpannerChangeStreamsToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToGcsOptions.class);

    run(options);
  }

  private static String getProjectId(SpannerChangeStreamsToGcsOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) {
    LOG.info("Requested File Format is " + options.getOutputFileFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);

    // Get the Spanner project, instance, database, and change stream parameters.
    String projectId = getProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(projectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      LOG.info("Setting database role on SpannerConfig: " + options.getSpannerDatabaseRole());
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    LOG.info("Created SpannerConfig: " + spannerConfig);
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Creating " + options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
        .apply(
            "Write To GCS",
            FileFormatFactorySpannerChangeStreams.newBuilder().setOptions(options).build());

    return pipeline.run();
  }
}

Nächste Schritte