Sourcedb to Spanner 템플릿

SourceDB to Spanner 템플릿은 관계형 데이터베이스의 데이터를 기존 Spanner 데이터베이스에 복사하는 일괄 파이프라인입니다. 이 파이프라인은 JDBC를 사용하여 관계형 데이터베이스에 연결합니다. 이 템플릿을 통해 사용 가능한 JDBC 드라이버가 있는 관계형 데이터베이스의 데이터를 Spanner로 복사할 수 있습니다. 제한된 유형의 MySQL만 지원합니다.

보안 강화를 위해 Cloud KMS 키로 암호화된 Base64 인코딩 사용자 이름, 비밀번호 및 연결 문자열 매개변수와 함께 Cloud KMS 키를 전달할 수도 있습니다. 사용자 이름, 비밀번호, 연결 문자열 매개변수를 암호화하는 방법에 대한 자세한 내용은 Cloud KMS API 암호화 엔드포인트를 참조하세요.

파이프라인 요구사항

  • 관계형 데이터베이스용 JDBC 드라이버를 사용할 수 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 테이블이 있어야 합니다.
  • Spanner 테이블에 호환 가능한 스키마가 있어야 합니다.
  • 관계형 데이터베이스는 Dataflow가 실행되는 서브넷에서 액세스할 수 있어야 합니다.

템플릿 매개변수

매개변수 설명
sourceConfigURL JDBC 연결 URL 문자열입니다. 예를 들면 jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8 또는 샤드 구성입니다.
instanceId 대상 Cloud Spanner 인스턴스입니다.
databaseId 대상 Cloud Spanner 데이터베이스입니다.
projectId Cloud Spanner 프로젝트 이름입니다.
outputDirectory 이 디렉터리는 마이그레이션에서 실패/건너뛰기/필터링된 레코드를 덤프하는 데 사용됩니다.
jdbcDriverJars 선택사항: 쉼표로 구분된 드라이버 JAR 파일 목록입니다. 예를 들면 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar입니다. 기본값은 빈 값입니다.
jdbcDriverClassName 선택사항: JDBC 드라이버 클래스 이름입니다. 예를 들면 com.mysql.jdbc.Driver입니다. 기본값은 com.mysql.jdbc.Driver입니다.
username (선택사항) JDBC 연결에 사용할 사용자 이름입니다. 기본값은 빈 값입니다.
password (선택사항) JDBC 연결에 사용할 비밀번호입니다. 기본값은 빈 값입니다.
tables 선택사항: 소스에서 마이그레이션할 테이블입니다. 기본값은 빈 값입니다.
numPartitions 선택사항: 파티션 수입니다. 이 값은 하한 및 상한과 함께 파티션 열을 균등하게 분할하는 데 사용하기 위해 생성된 WHERE 절 표현식에 대한 파티션 스트라이드를 형성합니다. 입력이 1보다 작으면 숫자가 1로 설정됩니다. 기본값은 0입니다.
spannerHost 선택사항: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 예를 들면 https://batch-spanner.googleapis.com입니다. 기본값은 https://batch-spanner.googleapis.com입니다.
maxConnections 선택사항: 최대 연결 수로 각 작업자의 JDBC 연결 풀을 구성합니다. 제한이 없으면 음수를 사용합니다. 예를 들면 -1입니다. 기본값은 0입니다.
sessionFilePath 선택사항: Spanner 마이그레이션 도구의 매핑 정보가 포함된 Cloud Storage의 세션 경로입니다. 기본값은 빈 값입니다.
transformationJarPath 선택사항: 레코드를 처리할 수 있는 커스텀 변환 로직이 포함된 Cloud Storage의 커스텀 jar 위치입니다. 기본값은 빈 값입니다.
transformationClassName 선택사항: 커스텀 변환 로직이 있는 정규화된 클래스 이름입니다. transformationJarPath가 지정된 경우 필수 필드입니다. 기본값은 빈 값입니다.
transformationCustomParameters 선택사항: 커스텀 변환 클래스에 전달할 커스텀 매개변수가 포함된 문자열입니다. 기본값은 빈 값입니다.
disabledAlgorithms 선택사항: 중지할 쉼표로 구분된 알고리즘입니다. 이 값을 없음으로 설정하면 알고리즘이 사용 중지되지 않습니다. 기본적으로 중지된 알고리즘에서 취약점 또는 성능 문제가 발생할 수 있으므로 이 매개변수를 사용할 때는 주의해야 합니다. 예를 들면 SSLv3, RC4입니다.
extraFilesToStage 선택사항: 작업자에 스테이징할 파일의 쉼표로 구분된 Cloud Storage 경로 또는 Secret Manager 보안 비밀입니다. 이러한 파일은 각 작업자의 /extra_files 디렉터리에 저장됩니다. 예를 들면 gs://<BUCKET>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>입니다.

템플릿 실행

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Sourcedb to Spanner template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Sourcedb_to_Spanner_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       sourceConfigURL=SOURCE_CONFIG_URL,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       projectId=PROJECT_ID,\
       outputDirectory=OUTPUT_DIRECTORY,\

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • SOURCE_CONFIG_URL: 소스 데이터베이스 호스트에 연결할 URL입니다. 1일 수 있습니다. JDBC 연결 URL - 호스트, 포트, 소스 db 이름을 포함해야 하며 선택적으로 autoReconnect, maxReconnects 등과 같은 속성을 포함할 수 있습니다. 형식: `jdbc:mysql://{host}:{port}/{dbName}?{parameters }`2. 샤드 구성 경로
  • INSTANCE_ID: Cloud Spanner 인스턴스 ID
  • DATABASE_ID: Cloud Spanner 데이터베이스 ID
  • PROJECT_ID: Cloud Spanner 프로젝트 ID
  • OUTPUT_DIRECTORY: 실패/건너뜀/필터링된 이벤트의 출력 디렉터리

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "sourceConfigURL": "SOURCE_CONFIG_URL",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "projectId": "PROJECT_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Sourcedb_to_Spanner_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • SOURCE_CONFIG_URL: 소스 데이터베이스 호스트에 연결할 URL입니다. 1일 수 있습니다. JDBC 연결 URL - 호스트, 포트, 소스 db 이름을 포함해야 하며 선택적으로 autoReconnect, maxReconnects 등과 같은 속성을 포함할 수 있습니다. 형식: `jdbc:mysql://{host}:{port}/{dbName}?{parameters }`2. 샤드 구성 경로
  • INSTANCE_ID: Cloud Spanner 인스턴스 ID
  • DATABASE_ID: Cloud Spanner 데이터베이스 ID
  • PROJECT_ID: Cloud Spanner 프로젝트 ID
  • OUTPUT_DIRECTORY: 실패/건너뜀/필터링된 이벤트의 출력 디렉터리
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A template that copies data from a relational database using JDBC to an existing Spanner
 * database.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Sourcedb_to_Spanner_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Sourcedb to Spanner",
    description = {
      "The SourceDB to Spanner template is a batch pipeline that copies data from a relational"
          + " database into an existing Spanner database. This pipeline uses JDBC to connect to"
          + " the relational database. You can use this template to copy data from any relational"
          + " database with available JDBC drivers into Spanner. This currently only supports a limited set of types of MySQL",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a"
          + " Base64-encoded username, password, and connection string parameters encrypted with"
          + " the Cloud KMS key. See the <a"
          + " href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud"
          + " KMS API encryption endpoint</a> for additional details on encrypting your username,"
          + " password, and connection string parameters."
    },
    optionsClass = SourceDbToSpannerOptions.class,
    flexContainerName = "source-db-to-spanner",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/sourcedb-to-spanner",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "The Spanner tables must exist before pipeline execution.",
      "The Spanner tables must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class SourceDbToSpanner {

  private static final Logger LOG = LoggerFactory.getLogger(SourceDbToSpanner.class);

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link SourceDbToSpanner#run} method to start the
   * pipeline and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    SourceDbToSpannerOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SourceDbToSpannerOptions.class);
    run(options);
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(SourceDbToSpannerOptions options) {
    // TODO - Validate if options are as expected
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig = createSpannerConfig(options);

    // Decide type and source of migration
    // TODO(vardhanvthigle): Move this within pipelineController.
    switch (options.getSourceDbDialect()) {
      case SourceDbToSpannerOptions.CASSANDRA_SOURCE_DIALECT:
        return PipelineController.executeCassandraMigration(options, pipeline, spannerConfig);
      default:
        /* Implementation detail, not having a default leads to failure in compile time checks enforced here */
        /* Making jdbc as default case which includes MYSQL and PG. */
        return executeJdbcMigration(options, pipeline, spannerConfig);
    }
  }

  // TODO(vardhanvthigle): Move this within pipelineController.
  private static PipelineResult executeJdbcMigration(
      SourceDbToSpannerOptions options, Pipeline pipeline, SpannerConfig spannerConfig) {
    if (options.getSourceConfigURL().startsWith("gs://")) {
      List<Shard> shards =
          new ShardFileReader(new SecretManagerAccessorImpl())
              .readForwardMigrationShardingConfig(options.getSourceConfigURL());
      return PipelineController.executeJdbcShardedMigration(
          options, pipeline, shards, spannerConfig);
    } else {
      return PipelineController.executeJdbcSingleInstanceMigration(
          options, pipeline, spannerConfig);
    }
  }

  @VisibleForTesting
  static SpannerConfig createSpannerConfig(SourceDbToSpannerOptions options) {
    return SpannerConfig.create()
        .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
        .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
        .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
        .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));
  }
}