Spanner to Cloud Storage Text 템플릿

Spanner to Cloud Storage Text 템플릿은 Spanner 테이블에서 데이터를 읽고 Cloud Storage에 CSV 텍스트 파일로 쓰는 일괄 파이프라인입니다.

파이프라인 요구사항

  • 파이프라인을 실행하기 전에 입력 Spanner 테이블이 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • spannerTable: 데이터를 읽을 Spanner 테이블입니다.
  • spannerProjectId: 데이터를 읽을 Spanner 데이터베이스가 포함된 Google Cloud 프로젝트의 ID입니다.
  • spannerInstanceId: 요청된 테이블의 인스턴스 ID입니다.
  • spannerDatabaseId: 요청된 테이블의 데이터베이스 ID입니다.
  • textWritePrefix: 데이터가 작성되는 위치를 지정하는 Cloud Storage 경로 접두사입니다. 예를 들면 gs://mybucket/somefolder/입니다.

선택적 매개변수

  • csvTempDirectory: 임시 CSV 파일이 작성되는 Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/your-path입니다.
  • spannerPriority: Spanner 호출의 요청 우선순위 (https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions)입니다. 가능한 값은 HIGH, MEDIUM, LOW입니다. 기본값은 MEDIUM입니다.
  • spannerHost: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 테스트에만 사용됩니다. 예를 들면 https://batch-spanner.googleapis.com입니다. 기본값은 https://batch-spanner.googleapis.com입니다.
  • spannerSnapshotTime: 읽으려는 Spanner 데이터베이스의 버전에 해당하는 타임스탬프입니다. 타임스탬프는 RFC 3339 (https://tools.ietf.org/html/rfc3339) UTC Zulu 시간 형식으로 지정되어야 합니다. 타임스탬프는 과거여야 하며 최대 타임스탬프 비활성 (https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness)이 적용됩니다. 예를 들면 1990-12-31T23:59:60Z입니다. 기본값은 빈 값입니다.
  • dataBoostEnabled: Spanner Data Boost의 컴퓨팅 리소스를 사용하여 Spanner OLTP 워크플로에 거의 영향을 주지 않고 작업을 실행하도록 true로 설정합니다. true인 경우 spanner.databases.useDataBoost Identity and Access Management (IAM) 권한이 필요합니다. 자세한 내용은 Data Boost 개요 (https://cloud.google.com/spanner/docs/databoost/databoost-overview)를 참고하세요. 기본값은 false입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Spanner to Text Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Spanner_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
spannerProjectId=SPANNER_PROJECT_ID,\
spannerDatabaseId=DATABASE_ID,\
spannerInstanceId=INSTANCE_ID,\
spannerTable=TABLE_ID,\
textWritePrefix=gs://BUCKET_NAME/output/

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_PROJECT_ID: 데이터를 읽을 Spanner 데이터베이스의 Google Cloud 프로젝트 ID입니다.
  • DATABASE_ID: Spanner 데이터베이스 ID
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • INSTANCE_ID: Spanner 인스턴스 ID
  • TABLE_ID: Spanner 테이블 ID

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Spanner_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "spannerDatabaseId": "DATABASE_ID",
       "spannerInstanceId": "INSTANCE_ID",
       "spannerTable": "TABLE_ID",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_PROJECT_ID: 데이터를 읽을 Spanner 데이터베이스의 Google Cloud 프로젝트 ID입니다.
  • DATABASE_ID: Spanner 데이터베이스 ID
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • INSTANCE_ID: Spanner 인스턴스 ID
  • TABLE_ID: Spanner 테이블 ID
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import static com.google.cloud.teleport.util.ValueProviderUtils.eitherOrValueProvider;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.SpannerToText.SpannerToTextOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.SpannerReadOptions;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which copies a Spanner table to a Text sink. It exports a Spanner table using
 * <a href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
 * creates multiple workers in parallel for better performance. The result is written to a CSV file
 * in Google Cloud Storage. The table schema file is saved in json format along with the exported
 * table.
 *
 * <p>Schema file sample: { "id":"INT64", "name":"STRING(MAX)" }
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Spanner_to_GCS_Text.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Spanner_to_GCS_Text",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Spanner to Text Files on Cloud Storage",
    description =
        "The Cloud Spanner to Cloud Storage Text template is a batch pipeline that reads in data from a Cloud Spanner "
            + "table, and writes it to Cloud Storage as CSV text files.",
    optionsClass = SpannerToTextOptions.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"The input Spanner table must exist before running the pipeline."})
public class SpannerToText {

  private static final Logger LOG = LoggerFactory.getLogger(SpannerToText.class);

  /** Custom PipelineOptions. */
  public interface SpannerToTextOptions
      extends PipelineOptions, SpannerReadOptions, FilesystemWriteOptions {

    @TemplateParameter.GcsWriteFolder(
        order = 1,
        groupName = "Target",
        optional = true,
        description = "Cloud Storage temp directory for storing CSV files",
        helpText = "The Cloud Storage path where temporary CSV files are written.",
        example = "gs://your-bucket/your-path")
    ValueProvider<String> getCsvTempDirectory();

    @SuppressWarnings("unused")
    void setCsvTempDirectory(ValueProvider<String> value);

    @TemplateParameter.Enum(
        order = 2,
        groupName = "Source",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority (https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions)"
                + " for Spanner calls. Possible values are `HIGH`, `MEDIUM`, `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  /**
   * Runs a pipeline which reads in Records from Spanner, and writes the CSV to TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    LOG.info("Starting pipeline setup");
    PipelineOptionsFactory.register(SpannerToTextOptions.class);
    SpannerToTextOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class);

    FileSystems.setDefaultPipelineOptions(options);
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(options.getSpannerHost())
            .withProjectId(options.getSpannerProjectId())
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withRpcPriority(options.getSpannerPriority())
            .withDataBoostEnabled(options.getDataBoostEnabled());

    PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
        SpannerConverters.ExportTransformFactory.create(
            options.getSpannerTable(),
            spannerConfig,
            options.getTextWritePrefix(),
            options.getSpannerSnapshotTime());

    /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
     * only take a timestamp object for exact staleness which works when
     * parameters are provided during template compile time. They do not work with
     * a Timestamp valueProvider which can take parameters at runtime. Hence a new
     * ParDo class CreateTransactionFnWithTimestamp had to be created for this
     * purpose.
     */
    PCollectionView<Transaction> tx =
        pipeline
            .apply("Setup for Transaction", Create.of(1))
            .apply(
                "Create transaction",
                ParDo.of(
                    new CreateTransactionFnWithTimestamp(
                        spannerConfig, options.getSpannerSnapshotTime())))
            .apply("As PCollectionView", View.asSingleton());

    PCollection<String> csv =
        pipeline
            .apply("Create export", spannerExport)
            // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
            // because ValueProvider parameters such as table name required for
            // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
            // type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
            // these parameters at the pipeline execution time.
            .apply(
                "Read all records",
                LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
            .apply(
                "Struct To Csv",
                MapElements.into(TypeDescriptors.strings())
                    .via(struct -> (new SpannerConverters.StructCsvPrinter()).print(struct)));

    ValueProvider<ResourceId> tempDirectoryResource =
        ValueProvider.NestedValueProvider.of(
            eitherOrValueProvider(options.getCsvTempDirectory(), options.getTextWritePrefix()),
            (SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));

    csv.apply(
        "Write to storage",
        TextIO.write()
            .to(options.getTextWritePrefix())
            .withSuffix(".csv")
            .withTempDirectory(tempDirectoryResource));

    pipeline.run();
    LOG.info("Completed pipeline setup");
  }
}

다음 단계