Cloud Storage Avro to Spanner 템플릿

Cloud Storage Avro to Spanner 템플릿은 Cloud Storage에 저장된 Spanner에서 내보낸 Avro 파일을 읽고 Spanner 데이터베이스로 가져오는 일괄 파이프라인입니다.

파이프라인 요구사항

  • 비어 있는 대상 Spanner 데이터베이스가 있어야 합니다.
  • Cloud Storage 버킷에 대한 읽기 권한과 대상 Spanner 데이터베이스에 대한 쓰기 권한이 있어야 합니다.
  • 입력 Cloud Storage 경로가 있어야 하며, 가져올 파일의 JSON 설명이 있는 spanner-export.json 파일이 포함되어야 합니다.
  • 소스 Avro 파일에 기본 키가 포함되어 있지 않으면 템플릿을 실행하기 전에 기본 키를 사용하여 빈 Spanner 테이블을 만들어야 합니다. Avro 파일이 기본 키를 정의하는 경우에는 이 단계가 필요하지 않습니다.

템플릿 매개변수

필수 매개변수

  • instanceId: Spanner 데이터베이스의 인스턴스 ID입니다.
  • databaseId: Spanner 데이터베이스의 데이터베이스 ID입니다.
  • inputDir: Avro 파일을 가져올 Cloud Storage 경로입니다.

선택적 매개변수

  • spannerHost: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 테스트에만 사용됩니다. 예를 들면 https://batch-spanner.googleapis.com입니다. 기본값은 https://batch-spanner.googleapis.com입니다.
  • waitForIndexes: true이면 파이프라인은 색인이 생성될 때까지 기다립니다. false인 경우 색인이 백그라운드에서 계속 생성되는 동안에 작업이 완료될 수 있습니다. 기본값은 false입니다.
  • waitForForeignKeys: true이면 파이프라인은 외래 키가 생성될 때까지 기다립니다. false인 경우 외래 키가 백그라운드에서 계속 생성되는 동안에 작업이 완료될 수 있습니다. 기본값은 false입니다.
  • waitForChangeStreams: true이면 파이프라인은 변경 내역이 생성될 때까지 기다립니다. false인 경우 변경 내역이 백그라운드에서 계속 생성되는 동안에 작업이 완료될 수 있습니다. 기본값은 true입니다.
  • waitForSequences: 기본적으로 가져오기 파이프라인은 시퀀스 생성 시에 차단됩니다. false인 경우, 가져오기 파이프라인이 백그라운드에서 계속 생성되는 시퀀스로 완료될 수 있습니다.
  • earlyIndexCreateFlag: 조기 색인 생성이 사용 설정되어 있는지를 지정합니다. 템플릿이 많은 DDL 문을 실행하는 경우 데이터를 로드하기 전에 색인을 만드는 것이 더 효율적입니다. 따라서 기본 동작은 DDL 문 수가 기준점을 초과할 때 먼저 색인을 만드는 것입니다. 이 기능을 사용 중지하려면 earlyIndexCreateFlagfalse로 설정합니다. 기본값은 true입니다.
  • spannerProjectId: Spanner 데이터베이스가 포함된 Google Cloud 프로젝트의 ID입니다. 설정하지 않으면 기본 Google Cloud 프로젝트가 사용됩니다.
  • ddlCreationTimeoutInMinutes: 템플릿에서 수행한 DDL 문의 제한 시간(분)입니다. 기본값은 30분입니다.
  • spannerPriority: Spanner 호출의 요청 우선순위입니다. 가능한 값은 HIGH, MEDIUM, LOW입니다. 기본값은 MEDIUM입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.

    Google Cloud 콘솔의 Spanner 인스턴스 페이지에 작업이 표시되려면 작업 이름이 다음 형식과 일치해야 합니다.

    cloud-spanner-import-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME

    다음을 바꿉니다.

    • SPANNER_INSTANCE_ID: Spanner 인스턴스의 ID입니다.
    • SPANNER_DATABASE_NAME: Spanner 데이터베이스의 이름입니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Avro Files on Cloud Storage to Cloud Spanner template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Avro_to_Cloud_Spanner \
    --region REGION_NAME \
    --staging-location GCS_STAGING_LOCATION \
    --parameters \
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
inputDir=GCS_DIRECTORY

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • INSTANCE_ID: 데이터베이스가 포함된 Spanner 인스턴스의 ID
  • DATABASE_ID: 가져올 Spanner 데이터베이스의 ID
  • GCS_DIRECTORY: Avro 파일을 가져올 Cloud Storage 경로(예시: gs://mybucket/somefolder)

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Avro_to_Cloud_Spanner
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "inputDir": "gs://GCS_DIRECTORY"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • INSTANCE_ID: 데이터베이스가 포함된 Spanner 인스턴스의 ID
  • DATABASE_ID: 가져올 Spanner 데이터베이스의 ID
  • GCS_DIRECTORY: Avro 파일을 가져올 Cloud Storage 경로(예시: gs://mybucket/somefolder)
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.spanner;

import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.spanner.ImportPipeline.Options;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;

/**
 * Avro to Cloud Spanner Import pipeline.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Avro_to_Cloud_Spanner.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_Avro_to_Cloud_Spanner",
    category = TemplateCategory.BATCH,
    displayName = "Avro Files on Cloud Storage to Cloud Spanner",
    description =
        "The Cloud Storage Avro files to Cloud Spanner template is a batch pipeline that reads Avro files exported from "
            + "Cloud Spanner stored in Cloud Storage and imports them to a Cloud Spanner database.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/avro-to-cloud-spanner",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The target Cloud Spanner database must exist and must be empty.",
      "You must have read permissions for the Cloud Storage bucket and write permissions for the target Cloud Spanner database.",
      "The input Cloud Storage path must exist, and it must include a <a href=\"https://cloud.google.com/spanner/docs/import-non-spanner#create-export-json\">spanner-export.json</a> file that contains a JSON description of files to import."
    })
public class ImportPipeline {

  /** Options for {@link ImportPipeline}. */
  public interface Options extends PipelineOptions {

    @TemplateParameter.Text(
        order = 1,
        groupName = "Target",
        regexes = {"^[a-z0-9\\-]+$"},
        description = "Cloud Spanner instance ID",
        helpText = "The instance ID of the Spanner database.")
    ValueProvider<String> getInstanceId();

    void setInstanceId(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        regexes = {"^[a-z_0-9\\-]+$"},
        description = "Cloud Spanner database ID",
        helpText = "The database ID of the Spanner database.")
    ValueProvider<String> getDatabaseId();

    void setDatabaseId(ValueProvider<String> value);

    @TemplateParameter.GcsReadFolder(
        order = 3,
        groupName = "Source",
        description = "Cloud storage input directory",
        helpText = "The Cloud Storage path where the Avro files are imported from.")
    ValueProvider<String> getInputDir();

    void setInputDir(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 4,
        groupName = "Target",
        optional = true,
        description = "Cloud Spanner Endpoint to call",
        helpText = "The Cloud Spanner endpoint to call in the template. Only used for testing.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    ValueProvider<String> getSpannerHost();

    void setSpannerHost(ValueProvider<String> value);

    @TemplateParameter.Boolean(
        order = 5,
        optional = true,
        description = "Wait for Indexes",
        helpText =
            "If `true`, the pipeline waits for indexes to be created. If `false`, the job might complete while indexes are still being created in the background. The default value is `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getWaitForIndexes();

    void setWaitForIndexes(ValueProvider<Boolean> value);

    @TemplateParameter.Boolean(
        order = 6,
        optional = true,
        description = "Wait for Foreign Keys",
        helpText =
            "If `true`, the pipeline waits for foreign keys to be created. If `false`, the job might complete while foreign keys are still being created in the background. The default value is `false`.")
    @Default.Boolean(false)
    ValueProvider<Boolean> getWaitForForeignKeys();

    void setWaitForForeignKeys(ValueProvider<Boolean> value);

    @TemplateParameter.Boolean(
        order = 7,
        optional = true,
        description = "Wait for Change Streams",
        helpText =
            "If `true`, the pipeline waits for change streams to be created. If `false`, the job might complete while change streams are still being created in the background. The default value is `true`.")
    @Default.Boolean(true)
    ValueProvider<Boolean> getWaitForChangeStreams();

    void setWaitForChangeStreams(ValueProvider<Boolean> value);

    @TemplateParameter.Boolean(
        order = 7,
        optional = true,
        description = "Wait for Sequences",
        helpText =
            "By default, the import pipeline is blocked on sequence creation. If `false`, the import pipeline might"
                + " complete with sequences still being created in the background.")
    @Default.Boolean(true)
    ValueProvider<Boolean> getWaitForSequences();

    void setWaitForSequences(ValueProvider<Boolean> value);

    @TemplateParameter.Boolean(
        order = 8,
        optional = true,
        description = "Create Indexes early",
        helpText =
            "Specifies whether early index creation is enabled. If the template runs a large number of DDL statements, it's more efficient to create indexes before loading data. Therefore, the default behavior is to create the indexes first when the number of DDL statements exceeds a threshold. To disable this feature, set `earlyIndexCreateFlag` to `false`. The default value is `true`.")
    @Default.Boolean(true)
    ValueProvider<Boolean> getEarlyIndexCreateFlag();

    void setEarlyIndexCreateFlag(ValueProvider<Boolean> value);

    @TemplateCreationParameter(value = "false")
    @Description("If true, wait for job finish")
    @Default.Boolean(true)
    boolean getWaitUntilFinish();

    @TemplateParameter.ProjectId(
        order = 9,
        groupName = "Target",
        optional = true,
        description = "Cloud Spanner Project Id",
        helpText =
            "The ID of the Google Cloud project that contains the Spanner database. If not set, the default Google Cloud project is used.")
    ValueProvider<String> getSpannerProjectId();

    void setSpannerProjectId(ValueProvider<String> value);

    void setWaitUntilFinish(boolean value);

    @TemplateParameter.Integer(
        order = 10,
        optional = true,
        description = "DDL Creation timeout in minutes",
        helpText =
            "The timeout in minutes for DDL statements performed by the template. The default value is 30 minutes.")
    @Default.Integer(30)
    ValueProvider<Integer> getDdlCreationTimeoutInMinutes();

    void setDdlCreationTimeoutInMinutes(ValueProvider<Integer> value);

    @TemplateParameter.Enum(
        order = 11,
        groupName = "Target",
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Spanner calls. Possible values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `MEDIUM`.")
    ValueProvider<RpcPriority> getSpannerPriority();

    void setSpannerPriority(ValueProvider<RpcPriority> value);
  }

  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    Pipeline p = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            // Temporary fix explicitly setting SpannerConfig.projectId to the default project
            // if spannerProjectId is not provided as a parameter. Required as of Beam 2.38,
            // which no longer accepts null label values on metrics, and SpannerIO#setup() has
            // a bug resulting in the label value being set to the original parameter value,
            // with no fallback to the default project.
            // TODO: remove NestedValueProvider when this is fixed in Beam.
            .withProjectId(
                NestedValueProvider.of(
                    options.getSpannerProjectId(),
                    (SerializableFunction<String, String>)
                        input -> input != null ? input : SpannerOptions.getDefaultProjectId()))
            .withHost(options.getSpannerHost())
            .withInstanceId(options.getInstanceId())
            .withDatabaseId(options.getDatabaseId())
            .withRpcPriority(options.getSpannerPriority());

    p.apply(
        new ImportTransform(
            spannerConfig,
            options.getInputDir(),
            options.getWaitForIndexes(),
            options.getWaitForForeignKeys(),
            options.getWaitForChangeStreams(),
            options.getWaitForSequences(),
            options.getEarlyIndexCreateFlag(),
            options.getDdlCreationTimeoutInMinutes()));

    PipelineResult result = p.run();

    if (options.getWaitUntilFinish()
        &&
        /* Only if template location is null, there is a dataflow job to wait for. Else it's
         * template generation which doesn't start a dataflow job.
         */
        options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      result.waitUntilFinish();
    }
  }
}

다음 단계