Sourcedb to Spanner テンプレート

SourceDB to Spanner テンプレートは、リレーショナル データベースから既存の Spanner データベースにデータをコピーするバッチ パイプラインです。このパイプラインは、JDBC を使用してリレーショナル データベースに接続します。このテンプレートを使用すると、使用可能な JDBC ドライバがある任意のリレーショナル データベースから Spanner にデータをコピーできます。これは、MySQL の限定されたタイプのみをサポートします。

保護をさらに強化するために、Cloud KMS 鍵で暗号化された Base64 でエンコードされたユーザー名、パスワード、接続文字列パラメータを渡すこともできます。詳しくは Cloud KMS API 暗号化エンドポイントで、ユーザー名、パスワード、接続文字列パラメータの暗号化の詳細をご覧ください。

パイプラインの要件

  • リレーショナル データベース用の JDBC ドライバが使用可能である必要があります。
  • パイプラインの実行前に Spanner テーブルが存在している必要があります。
  • Spanner テーブルに互換性のあるスキーマが必要です。
  • リレーショナル データベースは、Dataflow が実行されているサブネットからアクセス可能である必要があります。

テンプレートのパラメータ

パラメータ 説明
sourceConfigURL JDBC 接続 URL 文字列。たとえば、jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8 やシャード構成などです。
instanceId 宛先の Cloud Spanner インスタンス。
databaseId 宛先の Cloud Spanner データベース。
projectId これは Cloud Spanner プロジェクトの名前です。
outputDirectory このディレクトリは、移行で失敗、スキップ、またはフィルタされたレコードをダンプするために使用されます。
jdbcDriverJars 省略可: ドライバ JAR ファイルのカンマ区切りのリスト例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar。デフォルトは空です。
jdbcDriverClassName 省略可: JDBC ドライバのクラス名。例: com.mysql.jdbc.Driver。デフォルト: com.mysql.jdbc.Driver。
username 省略可: JDBC 接続に使用するユーザー名。デフォルトは空です。
password 省略可: JDBC 接続に使用するパスワード。デフォルトは空です。
tables 省略可: ソースから移行するテーブル。デフォルトは空です。
numPartitions 省略可: パーティションの数。このパラメータは、上限と下限により、パーティション列を均等に分割するために使用される、生成済みの WHERE 句式のパーティション スライドを形成します。入力が 1 より小さい場合、数値は 1 に設定されます。デフォルト: 0
spannerHost 省略可: テンプレートで呼び出す Cloud Spanner エンドポイント。例: https://batch-spanner.googleapis.com。デフォルト: https://batch-spanner.googleapis.com。
maxConnections 省略可: 各ワーカーの JDBC 接続プールを最大接続数で構成します。上限なしの場合は負の数を使用します。例: -1。デフォルト: 0
sessionFilePath 省略可: Spanner Migration Tool のマッピング情報が含まれる Cloud Storage 内のセッションパス。デフォルトは空です。
transformationJarPath 省略可: レコードを処理するカスタム変換ロジックを含むカスタム jar が保存されている Cloud Storage 内の場所。デフォルトは空です。
transformationClassName 省略可: カスタム変換ロジックが含まれる完全修飾クラス名。transformationJarPath が指定されている場合は必須フィールドです。デフォルトは空です。
transformationCustomParameters 省略可: カスタム変換クラスに渡すカスタム パラメータが含まれる文字列。デフォルトは空です。
disabledAlgorithms 省略可: 無効にするためのカンマ区切りのアルゴリズム。この値が none に設定されている場合、アルゴリズムは無効になりません。デフォルトで無効になっているアルゴリズムには脆弱性やパフォーマンスの問題が存在する可能性があるため、このパラメータは慎重に使用してください。例: SSLv3, RC4
extraFilesToStage 省略可: ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://<BUCKET>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>

テンプレートを実行する

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Sourcedb to Spanner template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Sourcedb_to_Spanner_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       sourceConfigURL=SOURCE_CONFIG_URL,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       projectId=PROJECT_ID,\
       outputDirectory=OUTPUT_DIRECTORY,\

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SOURCE_CONFIG_URL: ソース データベース ホストに接続する URL。1 にすることもできます。JDBC 接続 URL - ホスト、ポート、ソース データベース名を含める必要があります。必要に応じて、autoReconnect、maxReconnects などのプロパティを含めることができます。形式: `jdbc:mysql://{host}:{port}/{dbName}?{parameters}`2。シャーディング構成パス
  • INSTANCE_ID: Cloud Spanner インスタンス ID。
  • DATABASE_ID: Cloud Spanner データベース ID。
  • PROJECT_ID: Cloud Spanner プロジェクト ID。
  • OUTPUT_DIRECTORY: 失敗、スキップ、またはフィルタされたイベントの出力ディレクトリ

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "sourceConfigURL": "SOURCE_CONFIG_URL",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "projectId": "PROJECT_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Sourcedb_to_Spanner_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SOURCE_CONFIG_URL: ソース データベース ホストに接続する URL。1 にすることもできます。JDBC 接続 URL - ホスト、ポート、ソース データベース名を含める必要があります。必要に応じて、autoReconnect、maxReconnects などのプロパティを含めることができます。形式: `jdbc:mysql://{host}:{port}/{dbName}?{parameters}`2。シャーディング構成パス
  • INSTANCE_ID: Cloud Spanner インスタンス ID。
  • DATABASE_ID: Cloud Spanner データベース ID。
  • PROJECT_ID: Cloud Spanner プロジェクト ID。
  • OUTPUT_DIRECTORY: 失敗、スキップ、またはフィルタされたイベントの出力ディレクトリ
Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A template that copies data from a relational database using JDBC to an existing Spanner
 * database.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Sourcedb_to_Spanner_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Sourcedb to Spanner",
    description = {
      "The SourceDB to Spanner template is a batch pipeline that copies data from a relational"
          + " database into an existing Spanner database. This pipeline uses JDBC to connect to"
          + " the relational database. You can use this template to copy data from any relational"
          + " database with available JDBC drivers into Spanner. This currently only supports a limited set of types of MySQL",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a"
          + " Base64-encoded username, password, and connection string parameters encrypted with"
          + " the Cloud KMS key. See the <a"
          + " href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud"
          + " KMS API encryption endpoint</a> for additional details on encrypting your username,"
          + " password, and connection string parameters."
    },
    optionsClass = SourceDbToSpannerOptions.class,
    flexContainerName = "source-db-to-spanner",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/sourcedb-to-spanner",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "The Spanner tables must exist before pipeline execution.",
      "The Spanner tables must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class SourceDbToSpanner {

  private static final Logger LOG = LoggerFactory.getLogger(SourceDbToSpanner.class);

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link SourceDbToSpanner#run} method to start the
   * pipeline and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    SourceDbToSpannerOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SourceDbToSpannerOptions.class);
    run(options);
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(SourceDbToSpannerOptions options) {
    // TODO - Validate if options are as expected
    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig = createSpannerConfig(options);

    // Decide type and source of migration
    // TODO(vardhanvthigle): Move this within pipelineController.
    switch (options.getSourceDbDialect()) {
      case SourceDbToSpannerOptions.CASSANDRA_SOURCE_DIALECT:
        return PipelineController.executeCassandraMigration(options, pipeline, spannerConfig);
      default:
        /* Implementation detail, not having a default leads to failure in compile time checks enforced here */
        /* Making jdbc as default case which includes MYSQL and PG. */
        return executeJdbcMigration(options, pipeline, spannerConfig);
    }
  }

  // TODO(vardhanvthigle): Move this within pipelineController.
  private static PipelineResult executeJdbcMigration(
      SourceDbToSpannerOptions options, Pipeline pipeline, SpannerConfig spannerConfig) {
    if (options.getSourceConfigURL().startsWith("gs://")) {
      List<Shard> shards =
          new ShardFileReader(new SecretManagerAccessorImpl())
              .readForwardMigrationShardingConfig(options.getSourceConfigURL());
      return PipelineController.executeJdbcShardedMigration(
          options, pipeline, shards, spannerConfig);
    } else {
      return PipelineController.executeJdbcSingleInstanceMigration(
          options, pipeline, spannerConfig);
    }
  }

  @VisibleForTesting
  static SpannerConfig createSpannerConfig(SourceDbToSpannerOptions options) {
    return SpannerConfig.create()
        .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
        .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
        .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
        .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));
  }
}