Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
Il modello Testo da Spanner a Cloud Storage è una pipeline batch che legge i dati da una tabella Spanner e li scrive in Cloud Storage come file di testo CSV.
Requisiti della pipeline
La tabella Spanner di input deve esistere prima di eseguire la pipeline.
Parametri del modello
Parametri obbligatori
spannerTable : la tabella Spanner da cui leggere i dati.
spannerProjectId : l'ID del progetto Google Cloud che contiene il database Spanner da cui leggere i dati.
spannerInstanceId : l'ID istanza della tabella richiesta.
spannerDatabaseId : l'ID del database della tabella richiesta.
textWritePrefix : il prefisso del percorso di Cloud Storage che specifica dove vengono scritti i dati. ad esempio gs://mybucket/somefolder/.
Parametri facoltativi
csvTempDirectory : il percorso Cloud Storage in cui vengono scritti i file CSV temporanei. (Esempio: gs://your-bucket/your-path).
dataBoostEnabled : imposta il valore su true per utilizzare le risorse di calcolo di Spanner Data Boost per eseguire il job con un impatto prossimo allo zero sui flussi di lavoro OLTP di Spanner. Se il valore è true, è necessaria l'autorizzazione Identity and Access Management (IAM) di spanner.databases.useDataBoost. Per ulteriori informazioni, consulta la panoramica di Data Boost (https://cloud.google.com/spanner/docs/databoost/databoost-overview). Il valore predefinito è: false.
il nome della versione, come 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella padre con data all'interno del bucket:
gs://dataflow-templates-REGION_NAME/
REGION_NAME:
la regione in cui vuoi eseguire
il deployment del job Dataflow, ad esempio us-central1
SPANNER_PROJECT_ID: l'ID progetto Google Cloud del database Spanner da cui vuoi leggere i dati
DATABASE_ID: l'ID database Spanner
BUCKET_NAME: il nome del bucket Cloud Storage
INSTANCE_ID: l'ID istanza Spanner
TABLE_ID: l'ID tabella di Spanner
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.
il nome della versione, come 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella padre con data all'interno del bucket:
gs://dataflow-templates-REGION_NAME/
LOCATION:
la regione in cui vuoi eseguire
il deployment del job Dataflow, ad esempio us-central1
SPANNER_PROJECT_ID: l'ID progetto Google Cloud del database Spanner da cui vuoi leggere i dati
DATABASE_ID: l'ID database Spanner
BUCKET_NAME: il nome del bucket Cloud Storage
INSTANCE_ID: l'ID istanza Spanner
TABLE_ID: l'ID tabella di Spanner
Codice sorgente del modello
Java
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.util.ValueProviderUtils.eitherOrValueProvider;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerToText.SpannerToTextOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.SpannerReadOptions;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which copies a Spanner table to a Text sink. It exports a Spanner table using
* <a href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
* creates multiple workers in parallel for better performance. The result is written to a CSV file
* in Google Cloud Storage. The table schema file is saved in json format along with the exported
* table.
*
* <p>Schema file sample: { "id":"INT64", "name":"STRING(MAX)" }
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Spanner_to_GCS_Text.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Spanner_to_GCS_Text",
category = TemplateCategory.BATCH,
displayName = "Cloud Spanner to Text Files on Cloud Storage",
description =
"The Cloud Spanner to Cloud Storage Text template is a batch pipeline that reads in data from a Cloud Spanner "
+ "table, and writes it to Cloud Storage as CSV text files.",
optionsClass = SpannerToTextOptions.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-cloud-storage",
contactInformation = "https://cloud.google.com/support",
requirements = {"The input Spanner table must exist before running the pipeline."})
public class SpannerToText {
private static final Logger LOG = LoggerFactory.getLogger(SpannerToText.class);
/** Custom PipelineOptions. */
public interface SpannerToTextOptions
extends PipelineOptions, SpannerReadOptions, FilesystemWriteOptions {
@TemplateParameter.GcsWriteFolder(
order = 1,
optional = true,
description = "Cloud Storage temp directory for storing CSV files",
helpText = "The Cloud Storage path where temporary CSV files are written.",
example = "gs://your-bucket/your-path")
ValueProvider<String> getCsvTempDirectory();
@SuppressWarnings("unused")
void setCsvTempDirectory(ValueProvider<String> value);
@TemplateParameter.Enum(
order = 2,
enumOptions = {
@TemplateEnumOption("LOW"),
@TemplateEnumOption("MEDIUM"),
@TemplateEnumOption("HIGH")
},
optional = true,
description = "Priority for Spanner RPC invocations",
helpText =
"The request priority (https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions)"
+ " for Spanner calls. Possible values are `HIGH`, `MEDIUM`, `LOW`. The default value is `MEDIUM`.")
ValueProvider<RpcPriority> getSpannerPriority();
void setSpannerPriority(ValueProvider<RpcPriority> value);
}
/**
* Runs a pipeline which reads in Records from Spanner, and writes the CSV to TextIO sink.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
LOG.info("Starting pipeline setup");
PipelineOptionsFactory.register(SpannerToTextOptions.class);
SpannerToTextOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class);
FileSystems.setDefaultPipelineOptions(options);
Pipeline pipeline = Pipeline.create(options);
SpannerConfig spannerConfig =
SpannerConfig.create()
.withHost(options.getSpannerHost())
.withProjectId(options.getSpannerProjectId())
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.withRpcPriority(options.getSpannerPriority())
.withDataBoostEnabled(options.getDataBoostEnabled());
PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
SpannerConverters.ExportTransformFactory.create(
options.getSpannerTable(),
spannerConfig,
options.getTextWritePrefix(),
options.getSpannerSnapshotTime());
/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
* ParDo class CreateTransactionFnWithTimestamp had to be created for this
* purpose.
*/
PCollectionView<Transaction> tx =
pipeline
.apply("Setup for Transaction", Create.of(1))
.apply(
"Create transaction",
ParDo.of(
new CreateTransactionFnWithTimestamp(
spannerConfig, options.getSpannerSnapshotTime())))
.apply("As PCollectionView", View.asSingleton());
PCollection<String> csv =
pipeline
.apply("Create export", spannerExport)
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
// because ValueProvider parameters such as table name required for
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
// these parameters at the pipeline execution time.
.apply(
"Read all records",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
.apply(
"Struct To Csv",
MapElements.into(TypeDescriptors.strings())
.via(struct -> (new SpannerConverters.StructCsvPrinter()).print(struct)));
ValueProvider<ResourceId> tempDirectoryResource =
ValueProvider.NestedValueProvider.of(
eitherOrValueProvider(options.getCsvTempDirectory(), options.getTextWritePrefix()),
(SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));
csv.apply(
"Write to storage",
TextIO.write()
.to(options.getTextWritePrefix())
.withSuffix(".csv")
.withTempDirectory(tempDirectoryResource));
pipeline.run();
LOG.info("Completed pipeline setup");
}
}