Modelo de fluxos de alterações do Spanner para Cloud Storage

Os fluxos de alterações para o modelo do Cloud Storage são um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em um bucket do Cloud Storage usando o Dataflow Runner v2.

O pipeline agrupa os registros de stream de alterações do Spanner em janelas com base no carimbo de data/hora, e cada janela representa uma duração de tempo cujo tamanho pode ser configurado com esse modelo. Todos os registros com carimbos de data/hora pertencentes à janela têm a garantia de estarem na janela. Não pode haver chegadas atrasadas. Também é possível definir vários fragmentos de saída. O pipeline cria um arquivo de saída do Cloud Storage por janela por fragmento. Em um arquivo de saída, os registros são desordenados. Os arquivos de saída podem ser gravados no formato JSON ou AVRO, dependendo da configuração do usuário.

Observe que é possível minimizar a latência da rede e os custos de transporte dela executando o job do Dataflow na mesma região da sua instância do Spanner ou do bucket do Cloud Storage. Se você usar fontes, coletores, locais de arquivos de preparo ou de arquivos temporários localizados fora da região do job, seus dados poderão ser enviados entre regiões. Saiba mais sobre as regiões do Dataflow.

Saiba mais sobre fluxos de alterações, como criar pipelines de mudança no pipeline do Dataflow e práticas recomendadas.

Requisitos de pipeline

  • A instância do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Spanner precisa ser criado antes da execução do pipeline.
  • A instância de metadados do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Spanner precisa ser criado antes da execução do pipeline.
  • O bucket de saída do Cloud Storage precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetros obrigatórios

  • spannerInstanceId: o ID da instância do Spanner de onde os dados dos fluxos de alterações são lidos.
  • spannerDatabase: o banco de dados do Spanner de onde os dados dos fluxos de alterações serão lidos.
  • spannerMetadataInstanceId: o ID da instância do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
  • spannerMetadataDatabase: o banco de dados do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
  • spannerChangeStreamName: o nome do fluxo de alterações do Spanner a ser lido.
  • gcsOutputDirectory: o caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Precisa terminar com uma barra. A formatação DateTime é usada para analisar o caminho do diretório em busca de formatadores de data e hora. Por exemplo, gs://your-bucket/your-path.

Parâmetros opcionais

  • spannerProjectId: o ID do projeto do Google Cloud que contém o banco de dados do Spanner para ler os fluxos de alterações. Esse projeto também é onde a tabela de metadados do conector de fluxos de alteração é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
  • spannerDatabaseRole: o papel do banco de dados do Spanner a ser usado ao executar o modelo. Esse parâmetro é necessário somente quando o principal do IAM que executa o modelo é um usuário de controle de acesso minucioso. A função de banco de dados precisa ter o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função de leitura do fluxo de alterações. Para mais informações, consulte "Controle de acesso granular para fluxos de alteração" (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conector dos fluxos de alterações do Spanner a ser usado. Se não for fornecida, uma tabela de metadados de fluxos de alterações do Spanner será criada automaticamente durante a execução do pipeline. Você precisa fornecer um valor para esse parâmetro ao atualizar um pipeline atual. Caso contrário, não use esse parâmetro.
  • startTimestamp: o DateTime inicial, inclusive, a ser usado na leitura de fluxos de alterações, no formato Ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
  • endTimestamp: o DateTime de término, inclusive, para usar em fluxos de alterações de leitura. Por exemplo, Ex-2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
  • spannerHost: o endpoint do Cloud Spanner a ser chamado no modelo. Usado apenas para testes. Por exemplo, https://spanner.googleapis.com. O padrão é: https://spanner.googleapis.com.
  • outputFileFormat: o formato do arquivo de saída do Cloud Storage. Os formatos permitidos são TEXT e AVRO. O padrão é AVRO.
  • windowDuration: a duração da janela é o intervalo em que os dados são gravados no diretório de saída. Configure a duração com base na capacidade de processamento do pipeline. Por exemplo, uma capacidade de processamento mais alta pode exigir tamanhos de janela menores para que os dados se encaixem na memória. O padrão é 5m (cinco minutos), com o mínimo de 1s (um segundo). Os formatos permitidos são: [int]s (para segundos; exemplo: 5 s), [int]m (para minutos; exemplo: 12 min), [int]h (para horas; exemplo: 2h). Por exemplo, 5m.
  • rpcPriority: a prioridade de solicitação das chamadas do Spanner. O valor precisa ser HIGH, MEDIUM ou LOW. O padrão é HIGH.
  • outputFilenamePrefix: o prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-. O padrão é: saída.
  • numShards: o número máximo de fragmentos de saída produzidos durante a gravação. Um número maior de fragmentos significa maior capacidade de gravação no Cloud Storage, mas um custo de agregação de dados potencialmente maior entre os fragmentos ao processar os arquivos de saída do Cloud Storage. O padrão é: 20

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to Google Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: local do arquivo da saída dos fluxos de alterações

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: local do arquivo da saída dos fluxos de alterações
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToGcsOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link SpannerChangeStreamsToGcs} pipeline streams change stream record(s) and stores to
 * Google Cloud Storage bucket in user specified format. The sink data can be stored in a Text or
 * Avro file format.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Spanner_Change_Streams_to_Google_Cloud_Storage.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_Change_Streams_to_Google_Cloud_Storage",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to Cloud Storage",
    description = {
      "The Cloud Spanner change streams to Cloud Storage template is a streaming pipeline that streams Spanner data change records and writes them into a Cloud Storage bucket using Dataflow Runner V2.\n",
      "The pipeline groups Spanner change stream records into windows based on their timestamp, with each window representing a time duration whose length you can configure with this template. "
          + "All records with timestamps belonging to the window are guaranteed to be in the window; there can be no late arrivals. "
          + "You can also define a number of output shards; the pipeline creates one Cloud Storage output file per window per shard. "
          + "Within an output file, records are unordered. Output files can be written in either JSON or AVRO format, depending on the user configuration.\n",
      "Note that you can minimize network latency and network transport costs by running the Dataflow job from the same region as your Cloud Spanner instance or Cloud Storage bucket. "
          + "If you use sources, sinks, staging file locations, or temporary file locations that are located outside of your job's region, your data might be sent across regions. "
          + "See more about <a href=\"https://cloud.google.com/dataflow/docs/concepts/regional-endpoints\">Dataflow regional endpoints</a>.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change streams</a>, <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to build change streams Dataflow pipelines</a>, and <a href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best practices</a>."
    },
    optionsClass = SpannerChangeStreamsToGcsOptions.class,
    flexContainerName = "spanner-changestreams-to-gcs",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The Cloud Storage output bucket must exist prior to running the pipeline."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class SpannerChangeStreamsToGcs {
  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToGcs.class);
  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Input Files to GCS");

    SpannerChangeStreamsToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).as(SpannerChangeStreamsToGcsOptions.class);

    run(options);
  }

  private static String getProjectId(SpannerChangeStreamsToGcsOptions options) {
    return options.getSpannerProjectId().isEmpty()
        ? options.getProject()
        : options.getSpannerProjectId();
  }

  public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) {
    LOG.info("Requested File Format is " + options.getOutputFileFormat());
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    final Pipeline pipeline = Pipeline.create(options);

    // Get the Spanner project, instance, database, and change stream parameters.
    String projectId = getProjectId(options);
    String instanceId = options.getSpannerInstanceId();
    String databaseId = options.getSpannerDatabase();
    String metadataInstanceId = options.getSpannerMetadataInstanceId();
    String metadataDatabaseId = options.getSpannerMetadataDatabase();
    String changeStreamName = options.getSpannerChangeStreamName();

    // Retrieve and parse the start / end timestamps.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    // Add use_runner_v2 to the experiments option, since Change Streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);

    String metadataTableName =
        options.getSpannerMetadataTableName() == null
            ? null
            : options.getSpannerMetadataTableName();

    final RpcPriority rpcPriority = options.getRpcPriority();
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(projectId)
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId);
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      LOG.info("Setting database role on SpannerConfig: " + options.getSpannerDatabaseRole());
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }
    LOG.info("Created SpannerConfig: " + spannerConfig);
    pipeline
        .apply(
            SpannerIO.readChangeStream()
                .withSpannerConfig(spannerConfig)
                .withMetadataInstance(metadataInstanceId)
                .withMetadataDatabase(metadataDatabaseId)
                .withChangeStreamName(changeStreamName)
                .withInclusiveStartAt(startTimestamp)
                .withInclusiveEndAt(endTimestamp)
                .withRpcPriority(rpcPriority)
                .withMetadataTable(metadataTableName))
        .apply(
            "Creating " + options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
        .apply(
            "Write To GCS",
            FileFormatFactorySpannerChangeStreams.newBuilder().setOptions(options).build());

    return pipeline.run();
  }
}

A seguir