Tetap teratur dengan koleksi
Simpan dan kategorikan konten berdasarkan preferensi Anda.
Template Teks Spanner ke Cloud Storage adalah pipeline batch yang membaca data dari tabel Spanner, dan menulisnya ke Cloud Storage sebagai file teks CSV.
Persyaratan pipeline
Tabel Spanner input harus ada sebelum menjalankan pipeline.
Parameter template
Parameter
Deskripsi
spannerProjectId
ID Project Google Cloud dari database Spanner yang datanya ingin Anda baca.
spannerDatabaseId
ID database dari tabel yang diminta.
spannerInstanceId
ID instance tabel yang diminta.
spannerTable
Tabel yang datanya akan dibaca.
textWritePrefix
Jalur Cloud Storage tempat file teks output ditulis. Tambahkan / di akhir. Misalnya, gs://mybucket/somefolder/.
spannerSnapshotTime
(Opsional) Stempel waktu yang sesuai dengan versi database Spanner
yang ingin Anda baca. Stempel waktu harus ditentukan sesuai dengan
format "Zulu" RFC 3339 UTC.
Contoh, 1990-12-31T23:59:60Z. Stempel waktu harus berada di masa lalu dan
Penghentian stempel waktu
maksimum berlaku.
dataBoostEnabled
(Opsional)
Tetapkan ke true untuk menggunakan resource komputasi Spanner Data Boost
guna menjalankan tugas dengan dampak mendekati nol pada alur kerja OLTP Spanner.
Tindakan ini memerlukan izin Identity and Access Management (IAM) spanner.databases.useDataBoost. Untuk mengetahui informasi selengkapnya, lihat
Ringkasan Data Boost.
csvTempDirectory
(Opsional) Jalur Cloud Storage tempat file CSV sementara ditulis.
spannerPriority
(Opsional)
Prioritas permintaan
untuk panggilan Spanner. Nilai yang mungkin adalah HIGH,
MEDIUM, LOW. Nilai defaultnya adalah
MEDIUM.
nama versi, seperti 2023-09-12-00_RC00, untuk menggunakan versi template
tertentu, yang dapat ditemukan bertingkat di folder induk bertanggal masing-masing dalam bucket—
gs://dataflow-templates-REGION_NAME/
REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
SPANNER_PROJECT_ID: ID project Google Cloud dari
database Spanner tempat Anda ingin membaca data
DATABASE_ID: ID database Spanner
BUCKET_NAME: nama bucket Cloud Storage Anda
INSTANCE_ID: ID instance Spanner
TABLE_ID: ID tabel Spanner
API
Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.
nama versi, seperti 2023-09-12-00_RC00, untuk menggunakan versi template
tertentu, yang dapat ditemukan bertingkat di folder induk bertanggal masing-masing dalam bucket—
gs://dataflow-templates-REGION_NAME/
LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
SPANNER_PROJECT_ID: ID project Google Cloud dari
database Spanner tempat Anda ingin membaca data
DATABASE_ID: ID database Spanner
BUCKET_NAME: nama bucket Cloud Storage Anda
INSTANCE_ID: ID instance Spanner
TABLE_ID: ID tabel Spanner
Kode sumber template
Java
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.util.ValueProviderUtils.eitherOrValueProvider;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerToText.SpannerToTextOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.SpannerReadOptions;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which copies a Spanner table to a Text sink. It exports a Spanner table using
* <a href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
* creates multiple workers in parallel for better performance. The result is written to a CSV file
* in Google Cloud Storage. The table schema file is saved in json format along with the exported
* table.
*
* <p>Schema file sample: { "id":"INT64", "name":"STRING(MAX)" }
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Spanner_to_GCS_Text.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Spanner_to_GCS_Text",
category = TemplateCategory.BATCH,
displayName = "Cloud Spanner to Text Files on Cloud Storage",
description =
"The Cloud Spanner to Cloud Storage Text template is a batch pipeline that reads in data from a Cloud Spanner "
+ "table, and writes it to Cloud Storage as CSV text files.",
optionsClass = SpannerToTextOptions.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-cloud-storage",
contactInformation = "https://cloud.google.com/support",
requirements = {"The input Spanner table must exist before running the pipeline."})
public class SpannerToText {
private static final Logger LOG = LoggerFactory.getLogger(SpannerToText.class);
/** Custom PipelineOptions. */
public interface SpannerToTextOptions
extends PipelineOptions, SpannerReadOptions, FilesystemWriteOptions {
@TemplateParameter.GcsWriteFolder(
order = 1,
optional = true,
description = "Cloud Storage temp directory for storing CSV files",
helpText = "The Cloud Storage path where the temporary CSV files can be stored.",
example = "gs://your-bucket/your-path")
ValueProvider<String> getCsvTempDirectory();
@SuppressWarnings("unused")
void setCsvTempDirectory(ValueProvider<String> value);
@TemplateParameter.Enum(
order = 2,
enumOptions = {
@TemplateEnumOption("LOW"),
@TemplateEnumOption("MEDIUM"),
@TemplateEnumOption("HIGH")
},
optional = true,
description = "Priority for Spanner RPC invocations",
helpText =
"The request priority for Cloud Spanner calls. The value must be one of:"
+ " [HIGH,MEDIUM,LOW].")
ValueProvider<RpcPriority> getSpannerPriority();
void setSpannerPriority(ValueProvider<RpcPriority> value);
}
/**
* Runs a pipeline which reads in Records from Spanner, and writes the CSV to TextIO sink.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
LOG.info("Starting pipeline setup");
PipelineOptionsFactory.register(SpannerToTextOptions.class);
SpannerToTextOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class);
FileSystems.setDefaultPipelineOptions(options);
Pipeline pipeline = Pipeline.create(options);
SpannerConfig spannerConfig =
SpannerConfig.create()
.withHost(options.getSpannerHost())
.withProjectId(options.getSpannerProjectId())
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.withRpcPriority(options.getSpannerPriority())
.withDataBoostEnabled(options.getDataBoostEnabled());
PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
SpannerConverters.ExportTransformFactory.create(
options.getSpannerTable(),
spannerConfig,
options.getTextWritePrefix(),
options.getSpannerSnapshotTime());
/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
* ParDo class CreateTransactionFnWithTimestamp had to be created for this
* purpose.
*/
PCollectionView<Transaction> tx =
pipeline
.apply("Setup for Transaction", Create.of(1))
.apply(
"Create transaction",
ParDo.of(
new CreateTransactionFnWithTimestamp(
spannerConfig, options.getSpannerSnapshotTime())))
.apply("As PCollectionView", View.asSingleton());
PCollection<String> csv =
pipeline
.apply("Create export", spannerExport)
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
// because ValueProvider parameters such as table name required for
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
// these parameters at the pipeline execution time.
.apply(
"Read all records",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
.apply(
"Struct To Csv",
MapElements.into(TypeDescriptors.strings())
.via(struct -> (new SpannerConverters.StructCsvPrinter()).print(struct)));
ValueProvider<ResourceId> tempDirectoryResource =
ValueProvider.NestedValueProvider.of(
eitherOrValueProvider(options.getCsvTempDirectory(), options.getTextWritePrefix()),
(SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));
csv.apply(
"Write to storage",
TextIO.write()
.to(options.getTextWritePrefix())
.withSuffix(".csv")
.withTempDirectory(tempDirectoryResource));
pipeline.run();
LOG.info("Completed pipeline setup");
}
}
[[["Mudah dipahami","easyToUnderstand","thumb-up"],["Memecahkan masalah saya","solvedMyProblem","thumb-up"],["Lainnya","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Masalah terjemahan","translationIssue","thumb-down"],["Lainnya","otherDown","thumb-down"]],["Terakhir diperbarui pada 2024-03-29 UTC."],[],[]]