Modèle Spanner vers texte Cloud Storage

Le modèle Spanner vers texte Cloud Storage est un pipeline par lots qui lit les données d'une table Spanner et les écrit dans Cloud Storage sous forme de fichiers texte CSV.

Conditions requises pour ce pipeline

  • La table d'entrée Spanner doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètres obligatoires

  • spannerTable: table Spanner à partir de laquelle lire les données.
  • spannerProjectId: ID du projet Google Cloud qui contient la base de données Spanner à partir de laquelle lire les données.
  • spannerInstanceId: ID d'instance de la table demandée.
  • spannerDatabaseId: ID de base de données de la table demandée.
  • textWritePrefix: préfixe du chemin d'accès à Cloud Storage qui spécifie l'emplacement d'écriture des données Exemple :gs://mybucket/somefolder/

Paramètres facultatifs

  • csvTempDirectory: chemin d'accès Cloud Storage où les fichiers CSV temporaires sont écrits. Exemple :gs://your-bucket/your-path
  • spannerPriority: priorité des requêtes (https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions) pour les appels Spanner. Les valeurs possibles sont HIGH, MEDIUM, LOW. La valeur par défaut est MEDIUM.
  • spannerHost: point de terminaison Cloud Spanner à appeler dans le modèle. Utilisé uniquement pour les tests. Exemple :https://batch-spanner.googleapis.com La valeur par défaut est https://spanner.googleapis.com.
  • spannerSnapshotTime: code temporel correspondant à la version de la base de données Spanner à partir de laquelle vous souhaitez lire. L'horodatage doit être spécifié au format UTC Zulu RFC 3339 (https://tools.ietf.org/html/rfc3339). L'horodatage doit être antérieur à la date et l'heure actuelles et l'Obsolescence maximale de l'horodatage (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness) s'applique. Par exemple, 1990-12-31T23:59:60Z. La valeur par défaut est vide.
  • dataBoostEnabled: définissez la valeur sur true pour utiliser les ressources de calcul de Spanner Data Boost afin d'exécuter la tâche avec un impact quasi nul sur les workflows OLTP de Spanner. Si la valeur est "true", l'autorisation IAM (Identity and Access Management) spanner.databases.useDataBoost est requise. Pour en savoir plus, consultez la présentation de Data Boost (https://cloud.google.com/spanner/docs/databoost/databoost-overview). La valeur par défaut est "false".

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Spanner to Text Files on Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Spanner_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
spannerProjectId=SPANNER_PROJECT_ID,\
spannerDatabaseId=DATABASE_ID,\
spannerInstanceId=INSTANCE_ID,\
spannerTable=TABLE_ID,\
textWritePrefix=gs://BUCKET_NAME/output/

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_PROJECT_ID : ID du projet Google Cloud de la base de données Spanner à partir de laquelle vous souhaitez lire les données
  • DATABASE_ID : ID de la base de données Spanner
  • BUCKET_NAME : nom du bucket Cloud Storage
  • INSTANCE_ID : ID d'instance Spanner
  • TABLE_ID : ID de la table Spanner

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Spanner_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "spannerDatabaseId": "DATABASE_ID",
       "spannerInstanceId": "INSTANCE_ID",
       "spannerTable": "TABLE_ID",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • SPANNER_PROJECT_ID : ID du projet Google Cloud de la base de données Spanner à partir de laquelle vous souhaitez lire les données
  • DATABASE_ID : ID de la base de données Spanner
  • BUCKET_NAME : nom du bucket Cloud Storage
  • INSTANCE_ID : ID d'instance Spanner
  • TABLE_ID : ID de la table Spanner
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import static com.google.cloud.teleport.util.ValueProviderUtils.eitherOrValueProvider;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerToText.SpannerToTextOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.SpannerReadOptions;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which copies a Spanner table to a Text sink. It exports a Spanner table using
 * <a href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
 * creates multiple workers in parallel for better performance. The result is written to a CSV file
 * in Google Cloud Storage. The table schema file is saved in json format along with the exported
 * table.
 *
 * <p>Schema file sample: { "id":"INT64", "name":"STRING(MAX)" }
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Spanner_to_GCS_Text.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_to_GCS_Text",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Spanner to Text Files on Cloud Storage",
    description =
        "The Cloud Spanner to Cloud Storage Text template is a batch pipeline that reads in data from a Cloud Spanner "
            + "table, and writes it to Cloud Storage as CSV text files.",
    optionsClass = SpannerToTextOptions.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"The input Spanner table must exist before running the pipeline."})
public class SpannerToText {

  private static final Logger LOG = LoggerFactory.getLogger(SpannerToText.class);

  /** Custom PipelineOptions. */
  public interface SpannerToTextOptions
      extends PipelineOptions, SpannerReadOptions, FilesystemWriteOptions {

    @TemplateParameter.GcsWriteFolder(
        order = 1,
        groupName = "Target",
        optional = true,
        description = "Cloud Storage temp directory for storing CSV files",
        helpText = "The Cloud Storage path where temporary CSV files are written.",
        example = "gs://your-bucket/your-path")
    ValueProvider<String> getCsvTempDirectory();

    @SuppressWarnings("unused")
    void setCsvTempDirectory(ValueProvider<String> value);

    @TemplateParameter.Enum(
        order = 2,
        groupName = "Source",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority (https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions)"
                + " for Spanner calls. Possible values are `HIGH`, `MEDIUM`, `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  /**
   * Runs a pipeline which reads in Records from Spanner, and writes the CSV to TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    LOG.info("Starting pipeline setup");
    PipelineOptionsFactory.register(SpannerToTextOptions.class);
    SpannerToTextOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class);

    FileSystems.setDefaultPipelineOptions(options);
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(options.getSpannerHost())
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withRpcPriority(options.getSpannerPriority())
            .withDataBoostEnabled(options.getDataBoostEnabled());

    PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
        SpannerConverters.ExportTransformFactory.create(
            options.getSpannerTable(),
            spannerConfig,
            options.getTextWritePrefix(),
            options.getSpannerSnapshotTime());

    /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        pipeline
            .apply("Setup for Transaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(
                    new CreateTransactionFnWithTimestamp(
                        spannerConfig, options.getSpannerSnapshotTime())))
            .apply("As PCollectionView", View.asSingleton());

    PCollection<String> csv =
        pipeline
            .apply("Create export", spannerExport)
            // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
            // because ValueProvider parameters such as table name required for
            // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
            // type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
            // these parameters at the pipeline execution time.
            .apply(
                "Read all records",
                LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
            .apply(
                "Struct To Csv",
                MapElements.into(TypeDescriptors.strings())
                    .via(struct -> (new SpannerConverters.StructCsvPrinter()).print(struct)));

    ValueProvider<ResourceId> tempDirectoryResource =
        ValueProvider.NestedValueProvider.of(
            eitherOrValueProvider(options.getCsvTempDirectory(), options.getTextWritePrefix()),
            (SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));

    csv.apply(
        "Write to storage",
        TextIO.write()
            .to(options.getTextWritePrefix())
            .withSuffix(".csv")
            .withTempDirectory(tempDirectoryResource));

    pipeline.run();
    LOG.info("Completed pipeline setup");
  }
}

Étape suivante