Modelo do Spanner para o Cloud Storage Text

O modelo do Spanner para Cloud Storage Text é um pipeline em lote que lê dados de uma tabela do Spanner e os grava no Cloud Storage como arquivos de texto CSV.

Requisitos de pipeline

  • A tabela de entrada do Spanner precisa existir antes de o pipeline ser executado.

Parâmetros do modelo

Parâmetros obrigatórios

  • spannerTable: a tabela do Spanner para ler os dados.
  • spannerProjectId: o ID do projeto do Google Cloud que contém o banco de dados do Spanner para ler os dados.
  • spannerInstanceId: o ID da instância da tabela solicitada.
  • spannerDatabaseId: o ID do banco de dados da tabela solicitada.
  • textWritePrefix: o prefixo de caminho do Cloud Storage que especifica onde os dados são gravados. Por exemplo, gs://mybucket/somefolder/.

Parâmetros opcionais

  • csvTempDirectory: o caminho do Cloud Storage em que os arquivos CSV temporários são gravados. Por exemplo, gs://your-bucket/your-path.
  • spannerPriority: a prioridade da solicitação (https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions) para chamadas do Spanner. Valores possíveis: HIGH, MEDIUM, LOW. O valor padrão é MEDIUM.
  • spannerHost: o endpoint do Cloud Spanner a ser chamado no modelo. Usado apenas para testes. Por exemplo, https://batch-spanner.googleapis.com. O padrão é: https://batch-spanner.googleapis.com.
  • spannerSnapshotTime: o carimbo de data/hora que corresponde à versão do banco de dados do Spanner do qual você quer ler. O carimbo de data/hora precisa ser especificado no formato UTC Zulu do RFC 3339 (https://tools.ietf.org/html/rfc3339). O carimbo de data/hora precisa estar no passado, e a inatividade máxima (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) é aplicável. Por exemplo, 1990-12-31T23:59:60Z. O padrão é vazio.
  • dataBoostEnabled: defina como true para usar os recursos de computação do Spanner Data Boost para executar o job com impacto quase zero nos fluxos de trabalho OLTP do Spanner. Quando verdadeiro, exige a permissão do Identity and Access Management (IAM) spanner.databases.useDataBoost. Para mais informações, consulte a visão geral do Data Boost (https://cloud.google.com/spanner/docs/databoost/databoost-overview). O padrão é: falso.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner to Text Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Spanner_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
spannerProjectId=SPANNER_PROJECT_ID,\
spannerDatabaseId=DATABASE_ID,\
spannerInstanceId=INSTANCE_ID,\
spannerTable=TABLE_ID,\
textWritePrefix=gs://BUCKET_NAME/output/

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_PROJECT_ID: o ID do projeto do Google Cloud do banco de dados do Spanner em que você quer ler os dados.
  • DATABASE_ID: o ID do banco de dados do Spanner
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • INSTANCE_ID: o ID da instância do Spanner
  • TABLE_ID: o ID da tabela do Spanner

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Spanner_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "spannerDatabaseId": "DATABASE_ID",
       "spannerInstanceId": "INSTANCE_ID",
       "spannerTable": "TABLE_ID",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_PROJECT_ID: o ID do projeto do Google Cloud do banco de dados do Spanner em que você quer ler os dados.
  • DATABASE_ID: o ID do banco de dados do Spanner
  • BUCKET_NAME: o nome do bucket do Cloud Storage
  • INSTANCE_ID: o ID da instância do Spanner
  • TABLE_ID: o ID da tabela do Spanner
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import static com.google.cloud.teleport.util.ValueProviderUtils.eitherOrValueProvider;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerToText.SpannerToTextOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.SpannerReadOptions;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which copies a Spanner table to a Text sink. It exports a Spanner table using
 * <a href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
 * creates multiple workers in parallel for better performance. The result is written to a CSV file
 * in Google Cloud Storage. The table schema file is saved in json format along with the exported
 * table.
 *
 * <p>Schema file sample: { "id":"INT64", "name":"STRING(MAX)" }
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Spanner_to_GCS_Text.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_to_GCS_Text",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Spanner to Text Files on Cloud Storage",
    description =
        "The Cloud Spanner to Cloud Storage Text template is a batch pipeline that reads in data from a Cloud Spanner "
            + "table, and writes it to Cloud Storage as CSV text files.",
    optionsClass = SpannerToTextOptions.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"The input Spanner table must exist before running the pipeline."})
public class SpannerToText {

  private static final Logger LOG = LoggerFactory.getLogger(SpannerToText.class);

  /** Custom PipelineOptions. */
  public interface SpannerToTextOptions
      extends PipelineOptions, SpannerReadOptions, FilesystemWriteOptions {

    @TemplateParameter.GcsWriteFolder(
        order = 1,
        groupName = "Target",
        optional = true,
        description = "Cloud Storage temp directory for storing CSV files",
        helpText = "The Cloud Storage path where temporary CSV files are written.",
        example = "gs://your-bucket/your-path")
    ValueProvider<String> getCsvTempDirectory();

    @SuppressWarnings("unused")
    void setCsvTempDirectory(ValueProvider<String> value);

    @TemplateParameter.Enum(
        order = 2,
        groupName = "Source",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority (https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions)"
                + " for Spanner calls. Possible values are `HIGH`, `MEDIUM`, `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  /**
   * Runs a pipeline which reads in Records from Spanner, and writes the CSV to TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    LOG.info("Starting pipeline setup");
    PipelineOptionsFactory.register(SpannerToTextOptions.class);
    SpannerToTextOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class);

    FileSystems.setDefaultPipelineOptions(options);
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(options.getSpannerHost())
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withRpcPriority(options.getSpannerPriority())
            .withDataBoostEnabled(options.getDataBoostEnabled());

    PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
        SpannerConverters.ExportTransformFactory.create(
            options.getSpannerTable(),
            spannerConfig,
            options.getTextWritePrefix(),
            options.getSpannerSnapshotTime());

    /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        pipeline
            .apply("Setup for Transaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(
                    new CreateTransactionFnWithTimestamp(
                        spannerConfig, options.getSpannerSnapshotTime())))
            .apply("As PCollectionView", View.asSingleton());

    PCollection<String> csv =
        pipeline
            .apply("Create export", spannerExport)
            // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
            // because ValueProvider parameters such as table name required for
            // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
            // type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
            // these parameters at the pipeline execution time.
            .apply(
                "Read all records",
                LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
            .apply(
                "Struct To Csv",
                MapElements.into(TypeDescriptors.strings())
                    .via(struct -> (new SpannerConverters.StructCsvPrinter()).print(struct)));

    ValueProvider<ResourceId> tempDirectoryResource =
        ValueProvider.NestedValueProvider.of(
            eitherOrValueProvider(options.getCsvTempDirectory(), options.getTextWritePrefix()),
            (SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));

    csv.apply(
        "Write to storage",
        TextIO.write()
            .to(options.getTextWritePrefix())
            .withSuffix(".csv")
            .withTempDirectory(tempDirectoryResource));

    pipeline.run();
    LOG.info("Completed pipeline setup");
  }
}

A seguir