Modello di flussi di modifiche di Spanner in Cloud Storage

Il modello di flussi di modifiche di Spanner in Cloud Storage è una pipeline di streaming che esegue il flusso dei record delle modifiche dei dati di Spanner e li scrive in un bucket Cloud Storage utilizzando Dataflow Runner 2.

La pipeline raggruppa i record dello stream di variazioni di Spanner in finestre in base al timestamp, con ogni finestra che rappresenta una durata temporale la cui lunghezza puoi configurare con questo modello. Tutti i record con timestamp appartenenti alla finestra sono garantiti per essere all'interno della finestra; non possono esserci arrivi in ritardo. Puoi anche definire un numero di shard di output; la pipeline crea un file di output Cloud Storage per finestra e per shard. In un file di output, i record non sono ordinati. I file di output possono essere scritti in formato JSON o AVRO, a seconda della configurazione dell'utente.

Tieni presente che puoi ridurre al minimo la latenza di rete e i costi di trasporto di rete eseguendo il job Dataflow dalla stessa regione dell'istanza Spanner o del bucket Cloud Storage. Se utilizzi origini, destinazioni, posizioni dei file di staging o posizioni dei file temporanei situate al di fuori della regione del tuo job, i dati potrebbero essere inviati tra regioni. Scopri di più sulle regioni di Dataflow.

Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow per i flussi di modifiche e sulle best practice.

Requisiti della pipeline

  • L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'istanza di metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il flusso di modifiche Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il bucket di output Cloud Storage deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametri obbligatori

  • spannerInstanceId: l'ID istanza Spanner da cui leggere i dati dei flussi di modifica.
  • spannerDatabase: il database Spanner da cui leggere i dati degli stream di modifiche.
  • spannerMetadataInstanceId: l'ID istanza Spanner da utilizzare per la tabella dei metadati del connettore delle modifiche in tempo reale.
  • spannerMetadataDatabase: il database Spanner da utilizzare per la tabella dei metadati del connettore degli stream di modifiche.
  • spannerChangeStreamName: il nome del flusso di modifiche Spanner da cui leggere.
  • gcsOutputDirectory: il percorso e il prefisso del nome file per la scrittura dei file di output. Deve terminare con una barra. La formattazione DateTime viene utilizzata per analizzare il percorso della directory per i formattatori di date e ore. Ad esempio, gs://your-bucket/your-path.

Parametri facoltativi

  • spannerProjectId: l'ID del progetto Google Cloud contenente il database Spanner da cui leggere gli stream di modifiche. In questo progetto viene creata anche la tabella dei metadati del connettore degli stream di modifiche. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • spannerDatabaseRole: il ruolo del database Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo se l'entità IAM che esegue il modello è un utente di controllo dell'accesso granulare. Il ruolo del database deve disporre del privilegio SELECT sul flusso di modifiche e del privilegio EXECUTE sulla funzione di lettura del flusso di modifiche. Per ulteriori informazioni, consulta Controllo dell'accesso granulare per gli stream di modifiche (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: il nome della tabella dei metadati del connettore di stream di modifiche di Spanner da utilizzare. Se non viene fornita, una tabella dei metadati degli stream di variazioni Spanner viene creata automaticamente durante l'esecuzione della pipeline. Devi fornire un valore per questo parametro quando aggiorni una pipeline esistente. In caso contrario, non utilizzare questo parametro.
  • startTimestamp: la data e l'ora di inizio, incluse, da utilizzare per la lettura degli stream di variazioni, nel formato Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp all'avvio della pipeline, ovvero l'ora corrente.
  • endTimestamp: la data e l'ora di fine, incluse, da utilizzare per la lettura degli stream di variazioni. Ad esempio, Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è un'ora infinita nel futuro.
  • spannerHost: l'endpoint Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Ad esempio, https://spanner.googleapis.com. Valore predefinito: https://spanner.googleapis.com.
  • outputFileFormat: il formato del file Cloud Storage di output. I formati consentiti sono TEXT e AVRO. Il valore predefinito è AVRO.
  • windowDuration: la durata della finestra è l'intervallo in cui i dati vengono scritti nella directory di output. Configura la durata in base al throughput della pipeline. Ad esempio, un throughput più elevato potrebbe richiedere dimensioni delle finestre più piccole in modo che i dati possano essere memorizzati nella memoria. Il valore predefinito è 5 minuti, con un minimo di 1 secondo. I formati consentiti sono: [int]s (per secondi, ad esempio 5s), [int]m (per minuti, ad esempio 12m), [int]h (per ore, ad esempio 2h). Ad esempio, 5m.
  • rpcPriority: la priorità della richiesta per le chiamate Spanner. Il valore deve essere HIGH, MEDIUM o LOW. Il valore predefinito è HIGH.
  • outputFilenamePrefix: il prefisso da inserire in ogni file analizzato. Ad esempio, output-. Il valore predefinito è output.
  • numShards: il numero massimo di shard di output prodotti durante la scrittura. Un numero maggiore di shard comporta una maggiore velocità effettiva per la scrittura in Cloud Storage, ma un costo potenzialmente più elevato per l'aggregazione dei dati tra gli shard durante l'elaborazione dei file di output di Cloud Storage. Il valore predefinito è 20.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Spanner change streams to Google Cloud Storage template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Cloud Spanner
  • SPANNER_DATABASE: database Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza dei metadati Cloud Spanner
  • SPANNER_METADATA_DATABASE: database dei metadati Cloud Spanner
  • SPANNER_CHANGE_STREAM: stream di modifiche Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: posizione del file per l'output dei flussi di modifiche

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Cloud Spanner
  • SPANNER_DATABASE: database Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza dei metadati Cloud Spanner
  • SPANNER_METADATA_DATABASE: database dei metadati Cloud Spanner
  • SPANNER_CHANGE_STREAM: stream di modifiche Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: posizione del file per l'output dei flussi di modifiche
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToGcsOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToGcs} pipeline streams change stream record(s) and stores to
 * Google Cloud Storage bucket in user specified format. The sink data can be stored in a Text or
 * Avro file format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_Google_Cloud_Storage.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_Google_Cloud_Storage",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Cloud Storage",
    description = {
      "The Cloud Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner V2.\n",
      "The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. "
          + "All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. "
          + "You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. "
          + "Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.\n",
      "Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Cloud Spanner instance or Cloud Storage bucket. "
          + "If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. "
          + "See more about <a href=\"https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\">Dataflow regional endpoints</a>.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToGcsOptions.class,
    flexContainerName = "spanner-changestreams-to-gcs",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Cloud Storage output bucket must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToGcs {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToGcs.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Files to GCS");

    SpannerChangeStreamsToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToGcsOptions.class);

    run(options);
  }

  private static String getProjectId(SpannerChangeStreamsToGcsOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) {
    LOG.info("Requested File Format is " + options.getOutputFileFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);

    // Get the Spanner project, instance, database, and change stream parameters.
    String projectId = getProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(projectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      LOG.info("Setting database role on SpannerConfig: " + options.getSpannerDatabaseRole());
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    LOG.info("Created SpannerConfig: " + spannerConfig);
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Creating " + options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
        .apply(
            "Write To GCS",
            FileFormatFactorySpannerChangeStreams.newBuilder().setOptions(options).build());

    return pipeline.run();
  }
}

Passaggi successivi