Datastream to Spanner テンプレート

Datastream to Spanner テンプレートは、Cloud Storage バケットから Datastream イベントを読み取り、Spanner データベースに書き込むストリーミング パイプラインです。これは、Datastream ソースから Spanner へのデータ移行を目的としています。

テンプレートの実行前に、移行に必要なすべてのテーブルが移行先の Spanner データベースに存在している必要があります。したがって、データ移行に先立ち、ソース データベースから移行先 Spanner へのスキーマの移行が完了している必要があります。移行前にテーブルにデータが存在する可能性があります。このテンプレートでは、Datastream スキーマの変更が Spanner データベースに伝播されません。

データの整合性が保証されるのは、すべてのデータが Spanner に書き込まれ、移行が終了したときだけです。Spanner に書き込まれる各レコードの順序指定の情報を保存するために、このテンプレートは Spanner データベース内の各テーブルに対して追加のテーブル(シャドウ テーブルと呼ばれる)を作成します。これは、移行終了時の整合性を確保するために使用されます。シャドウ テーブルは移行後に削除されないため、移行終了時の検証に使用できます。

オペレーション中に発生したエラー(スキーマの不一致、不正な形式の JSON ファイル、変換の実行によるエラーなど)は、エラーキューに記録されます。エラーキューは、エラーが発生したすべての Datastream イベントと、エラーの理由をテキスト形式で保存する Cloud Storage フォルダです。エラーは一時的な場合も永続的な場合もあり、エラーキューの適切な Cloud Storage フォルダに保存されます。一時的なエラーは自動的に再試行されますが、永続的なエラーは再試行されません。永続的なエラーが発生した場合は、テンプレートの実行中に変更イベントを修正し、再試行可能なバケットに移動することもできます。

パイプラインの要件

  • ステータスが [実行中] または [開始されていません] の Datastream ストリーム。
  • Datastream イベントが複製される Cloud Storage バケット。
  • 既存のテーブルを含む Spanner データベース。テーブルは空でも、データを含んでいてもかまいません。

テンプレートのパラメータ

必須パラメータ

  • instanceId: 変更が複製される Spanner インスタンス。
  • databaseId: 変更が複製される Spanner データベース。

オプション パラメータ

  • inputFilePattern: 複製する Datastream ファイルが含まれる Cloud Storage ファイルの場所。通常、これはストリームのルートパスです。この機能のサポートは無効になりました。
  • inputFileFormat: Datastream によって生成された出力ファイルの形式。例: avro,json。デフォルトは avro です。
  • sessionFilePath: HarbourBridge のマッピング情報が含まれる Cloud Storage 内のセッション ファイルのパス。
  • projectId: Spanner プロジェクト ID。
  • spannerHost: テンプレート内で呼び出す Cloud Spanner のエンドポイント(例: https://batch-spanner.googleapis.com)。デフォルトは https://batch-spanner.googleapis.com です。
  • gcsPubSubSubscription: Cloud Storage 通知ポリシーで使用されている Pub/Sub サブスクリプション。名前は projects/ の形式にします。
  • streamName: スキーマ情報とソースタイプについてポーリングするストリームの名前またはテンプレート。
  • shadowTablePrefix: シャドウ テーブルの名前に使用される接頭辞。デフォルトは shadow_ です。
  • shouldCreateShadowTables: Cloud Spanner データベースでシャドウ テーブルを作成する必要があるかどうかを示すフラグ。デフォルトは true です。
  • rfcStartDateTime: Cloud Storage(https://tools.ietf.org/html/rfc3339)からのフェッチに使用される開始日時。デフォルトは 1970-01-01T00:00:00.00Z です。
  • fileReadConcurrency: 同時に読み取る DataStream ファイルの数。デフォルトは 30 です。
  • deadLetterQueueDirectory: エラーキューの出力の保存に使用されるファイルパス。デフォルトのファイルパスは、Dataflow ジョブの一時保存場所の下のディレクトリです。
  • dlqRetryMinutes: デッドレター キューの再試行間隔(分)。デフォルトは 10 です。
  • dlqMaxRetryCount: デッドレター キューで一時的なエラーが発生した場合に再試行できる最大回数。デフォルトは 500 です。
  • dataStreamRootUrl: Datastream API のルート URL。デフォルトは https://datastream.googleapis.com/ です。
  • datastreamSourceType: Datastream が接続するソース データベースのタイプ。例: mysql/oracle。実行中の Datastream なしでテストする場合に設定する必要があります。
  • roundJsonDecimals: JSON 列内の小数値を、精度の低下なしで格納できる数値に丸めるかどうかを指定するフラグ。デフォルトは false です。
  • runMode: 実行モードのタイプ。値は regular(通常)または retryDLQ(デッドレター キューを使用して再試行)です。デフォルトは regular です。
  • transformationContextFilePath: 移行中に実行される変換でデータの入力に使用される、Cloud Storage 内の変換コンテキスト ファイル パス。例: 行の移行元のデータベースを識別するデータベース名のシャード ID。
  • directoryWatchDurationInMinutes: パイプラインが GCS 内のディレクトリをポーリングし続ける期間。Datastream の出力ファイルは、イベントのタイムスタンプを分単位でグループ化したディレクトリ構造で配置されます。このパラメータは、イベントがソース データベースで発生してから Datastream によって GCS に書き込まれるまでの最大遅延時間(99.9 パーセンタイル = 10 分)とほぼ等しい必要があります。デフォルトは 10 です。
  • spannerPriority: Cloud Spanner 呼び出しのリクエストの優先度。値は HIGH、MEDIUM、LOW のいずれかである必要があります。デフォルトは HIGH です。
  • dlqGcsPubSubSubscription: 通常モードでの実行時にデッドレター キュー再試行ディレクトリ用の Cloud Storage 通知ポリシーで使用されている Pub/Sub サブスクリプション。名前は projects/ の形式にします。
  • transformationJarPath: 前方移行でレコードを処理するカスタム変換ロジックが含まれる、Cloud Storage 内のカスタム jar の場所。デフォルトは空です。
  • transformationClassName: カスタム変換ロジックが含まれる完全修飾クラス名。transformationJarPath が指定されている場合は必須フィールドです。デフォルトは空です。
  • transformationCustomParameters: カスタム変換クラスに渡すカスタム パラメータが含まれる文字列。デフォルトは空です。
  • filteredEventsDirectory: カスタム変換によってフィルタされたイベントを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。
  • shardingContextFilePath: Cloud Storage 内のシャーディング コンテキスト ファイルパスは、ソースシャードごとに Spanner データベースにシャード ID を入力するために使用されます。形式は Map<stream_name, Map<db_name, shard_id>> です。
  • tableOverrides: ソースから Spanner へのテーブル名のオーバーライド。次の形式で記述されます。[{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}] この例では、Singers テーブルを Vocalists に、Albums テーブルを Records にマッピングしています(例: [{Singers, Vocalists}, {Albums, Records}])。デフォルトは空です。
  • columnOverrides: ソースから Spanner への列名のオーバーライドです。次のような形式で記述されます。[{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}] SourceTableName は、ソースペアと Spanner ペアの両方で同じである必要があります。テーブル名をオーバーライドするには、tableOverrides を使用します。この例では Singers テーブルと Albums テーブルそれぞれで、SingerName を TalentName に、AlbumName を RecordName にマッピングしています(例: [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}])。デフォルトは空です。
  • schemaOverridesFilePath: ソースから Spanner へのテーブルと列名のオーバーライドを指定するファイル。デフォルトは空です。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Datastream to Spanner template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。
Java
/*
 * Copyright (C) 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.services.datastream.v1.model.SourceConfig;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.datastream.sources.DataStreamIO;
import com.google.cloud.teleport.v2.datastream.utils.DataStreamClient;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NoopSchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options;
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants;
import com.google.cloud.teleport.v2.templates.spanner.ProcessInformationSchema;
import com.google.cloud.teleport.v2.templates.transform.ChangeEventTransformerDoFn;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests DataStream data from GCS as events. The events are written to Cloud
 * Spanner.
 *
 * <p>NOTE: Future versions will support: Pub/Sub, GCS, or Kafka as per DataStream
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Datastream_to_Spanner",
    category = TemplateCategory.STREAMING,
    displayName = "Datastream to Cloud Spanner",
    description = {
      "The Datastream to Cloud Spanner template is a streaming pipeline that reads <a"
          + " href=\"https://cloud.google.com/datastream/docs\">Datastream</a> events from a Cloud"
          + " Storage bucket and writes them to a Cloud Spanner database. It is intended for data"
          + " migration from Datastream sources to Cloud Spanner.\n",
      "All tables required for migration must exist in the destination Cloud Spanner database prior"
          + " to template execution. Hence schema migration from a source database to destination"
          + " Cloud Spanner must be completed prior to data migration. Data can exist in the tables"
          + " prior to migration. This template does not propagate Datastream schema changes to the"
          + " Cloud Spanner database.\n",
      "Data consistency is guaranteed only at the end of migration when all data has been written"
          + " to Cloud Spanner. To store ordering information for each record written to Cloud"
          + " Spanner, this template creates an additional table (called a shadow table) for each"
          + " table in the Cloud Spanner database. This is used to ensure consistency at the end of"
          + " migration. The shadow tables are not deleted after migration and can be used for"
          + " validation purposes at the end of migration.\n",
      "Any errors that occur during operation, such as schema mismatches, malformed JSON files, or"
          + " errors resulting from executing transforms, are recorded in an error queue. The error"
          + " queue is a Cloud Storage folder which stores all the Datastream events that had"
          + " encountered errors along with the error reason in text format. The errors can be"
          + " transient or permanent and are stored in appropriate Cloud Storage folders in the"
          + " error queue. The transient errors are retried automatically while the permanent"
          + " errors are not. In case of permanent errors, you have the option of making"
          + " corrections to the change events and moving them to the retriable bucket while the"
          + " template is running."
    },
    optionsClass = Options.class,
    flexContainerName = "datastream-to-spanner",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/datastream-to-cloud-spanner",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "A Datastream stream in Running or Not started state.",
      "A Cloud Storage bucket where Datastream events are replicated.",
      "A Cloud Spanner database with existing tables. These tables can be empty or contain data.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class DataStreamToSpanner {
  private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpanner.class);
  private static final String AVRO_SUFFIX = "avro";
  private static final String JSON_SUFFIX = "json";

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions, StreamingOptions, DataflowPipelineWorkerPoolOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        groupName = "Source",
        optional = true,
        description =
            "File location for Datastream file output in Cloud Storage. Support for this feature has been disabled.",
        helpText =
            "The Cloud Storage file location that contains the Datastream files to replicate. Typically, "
                + "this is the root path for a stream. Support for this feature has been disabled.")
    String getInputFilePattern();

    void setInputFilePattern(String value);

    @TemplateParameter.Enum(
        order = 2,
        enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("json")},
        optional = true,
        description = "Datastream output file format (avro/json).",
        helpText =
            "The format of the output file produced by Datastream. For example `avro,json`. Default, `avro`.")
    @Default.String("avro")
    String getInputFileFormat();

    void setInputFileFormat(String value);

    @TemplateParameter.GcsReadFile(
        order = 3,
        optional = true,
        description = "Session File Path in Cloud Storage",
        helpText =
            "Session file path in Cloud Storage that contains mapping information from"
                + " HarbourBridge")
    String getSessionFilePath();

    void setSessionFilePath(String value);

    @TemplateParameter.Text(
        order = 4,
        groupName = "Target",
        description = "Cloud Spanner Instance Id.",
        helpText = "The Spanner instance where the changes are replicated.")
    String getInstanceId();

    void setInstanceId(String value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        description = "Cloud Spanner Database Id.",
        helpText = "The Spanner database where the changes are replicated.")
    String getDatabaseId();

    void setDatabaseId(String value);

    @TemplateParameter.ProjectId(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Cloud Spanner Project Id.",
        helpText = "The Spanner project ID.")
    String getProjectId();

    void setProjectId(String projectId);

    @TemplateParameter.Text(
        order = 7,
        groupName = "Target",
        optional = true,
        description = "The Cloud Spanner Endpoint to call",
        helpText = "The Cloud Spanner endpoint to call in the template.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    String getSpannerHost();

    void setSpannerHost(String value);

    @TemplateParameter.PubsubSubscription(
        order = 8,
        optional = true,
        description = "The Pub/Sub subscription being used in a Cloud Storage notification policy.",
        helpText =
            "The Pub/Sub subscription being used in a Cloud Storage notification policy. The name"
                + " should be in the format of"
                + " projects/<project-id>/subscriptions/<subscription-name>.")
    String getGcsPubSubSubscription();

    void setGcsPubSubSubscription(String value);

    @TemplateParameter.Text(
        order = 9,
        groupName = "Source",
        optional = true,
        description = "Datastream stream name.",
        helpText =
            "The name or template for the stream to poll for schema information and source type.")
    String getStreamName();

    void setStreamName(String value);

    @TemplateParameter.Text(
        order = 10,
        optional = true,
        description = "Cloud Spanner shadow table prefix.",
        helpText = "The prefix used to name shadow tables. Default: `shadow_`.")
    @Default.String("shadow_")
    String getShadowTablePrefix();

    void setShadowTablePrefix(String value);

    @TemplateParameter.Boolean(
        order = 11,
        optional = true,
        description = "If true, create shadow tables in Cloud Spanner.",
        helpText =
            "This flag indicates whether shadow tables must be created in Cloud Spanner database.")
    @Default.Boolean(true)
    Boolean getShouldCreateShadowTables();

    void setShouldCreateShadowTables(Boolean value);

    @TemplateParameter.DateTime(
        order = 12,
        optional = true,
        description =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).",
        helpText =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).")
    @Default.String("1970-01-01T00:00:00.00Z")
    String getRfcStartDateTime();

    void setRfcStartDateTime(String value);

    @TemplateParameter.Integer(
        order = 13,
        optional = true,
        description = "File read concurrency",
        helpText = "The number of concurrent DataStream files to read.")
    @Default.Integer(30)
    Integer getFileReadConcurrency();

    void setFileReadConcurrency(Integer value);

    @TemplateParameter.Text(
        order = 14,
        optional = true,
        description = "Dead letter queue directory.",
        helpText =
            "The file path used when storing the error queue output. "
                + "The default file path is a directory under the Dataflow job's temp location.")
    @Default.String("")
    String getDeadLetterQueueDirectory();

    void setDeadLetterQueueDirectory(String value);

    @TemplateParameter.Integer(
        order = 15,
        optional = true,
        description = "Dead letter queue retry minutes",
        helpText = "The number of minutes between dead letter queue retries. Defaults to 10.")
    @Default.Integer(10)
    Integer getDlqRetryMinutes();

    void setDlqRetryMinutes(Integer value);

    @TemplateParameter.Integer(
        order = 16,
        optional = true,
        description = "Dead letter queue maximum retry count",
        helpText =
            "The max number of times temporary errors can be retried through DLQ. Defaults to 500.")
    @Default.Integer(500)
    Integer getDlqMaxRetryCount();

    void setDlqMaxRetryCount(Integer value);

    // DataStream API Root Url (only used for testing)
    @TemplateParameter.Text(
        order = 17,
        optional = true,
        description = "Datastream API Root URL (only required for testing)",
        helpText = "Datastream API Root URL.")
    @Default.String("https://datastream.googleapis.com/")
    String getDataStreamRootUrl();

    void setDataStreamRootUrl(String value);

    @TemplateParameter.Text(
        order = 18,
        optional = true,
        description = "Datastream source type (only required for testing)",
        helpText =
            "This is the type of source database that Datastream connects to. Example -"
                + " mysql/oracle. Need to be set when testing without an actual running"
                + " Datastream.")
    String getDatastreamSourceType();

    void setDatastreamSourceType(String value);

    @TemplateParameter.Boolean(
        order = 19,
        optional = true,
        description =
            "If true, rounds the decimal values in json columns to a number that can be stored"
                + " without loss of precision.",
        helpText =
            "This flag if set, rounds the decimal values in json columns to a number that can be"
                + " stored without loss of precision.")
    @Default.Boolean(false)
    Boolean getRoundJsonDecimals();

    void setRoundJsonDecimals(Boolean value);

    @TemplateParameter.Enum(
        order = 20,
        optional = true,
        description = "Run mode - currently supported are : regular or retryDLQ",
        enumOptions = {@TemplateEnumOption("regular"), @TemplateEnumOption("retryDLQ")},
        helpText = "This is the run mode type, whether regular or with retryDLQ.")
    @Default.String("regular")
    String getRunMode();

    void setRunMode(String value);

    @TemplateParameter.GcsReadFile(
        order = 21,
        optional = true,
        helpText =
            "Transformation context file path in cloud storage used to populate data used in"
                + " transformations performed during migrations   Eg: The shard id to db name to"
                + " identify the db from which a row was migrated",
        description = "Transformation context file path in cloud storage")
    String getTransformationContextFilePath();

    void setTransformationContextFilePath(String value);

    @TemplateParameter.Integer(
        order = 22,
        optional = true,
        description = "Directory watch duration in minutes. Default: 10 minutes",
        helpText =
            "The Duration for which the pipeline should keep polling a directory in GCS. Datastream"
                + "output files are arranged in a directory structure which depicts the timestamp "
                + "of the event grouped by minutes. This parameter should be approximately equal to"
                + "maximum delay which could occur between event occurring in source database and "
                + "the same event being written to GCS by Datastream. 99.9 percentile = 10 minutes")
    @Default.Integer(10)
    Integer getDirectoryWatchDurationInMinutes();

    void setDirectoryWatchDurationInMinutes(Integer value);

    @TemplateParameter.Enum(
        order = 23,
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Cloud Spanner calls. The value must be one of:"
                + " [HIGH,MEDIUM,LOW]. Defaults to HIGH")
    @Default.Enum("HIGH")
    RpcPriority getSpannerPriority();

    void setSpannerPriority(RpcPriority value);

    @TemplateParameter.PubsubSubscription(
        order = 24,
        optional = true,
        description =
            "The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
                + " retry directory when running in regular mode.",
        helpText =
            "The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
                + " retry directory when running in regular mode. The name should be in the format"
                + " of projects/<project-id>/subscriptions/<subscription-name>. When set, the"
                + " deadLetterQueueDirectory and dlqRetryMinutes are ignored.")
    String getDlqGcsPubSubSubscription();

    void setDlqGcsPubSubSubscription(String value);

    @TemplateParameter.GcsReadFile(
        order = 25,
        optional = true,
        description = "Custom jar location in Cloud Storage",
        helpText =
            "Custom jar location in Cloud Storage that contains the custom transformation logic for processing records"
                + " in forward migration.")
    @Default.String("")
    String getTransformationJarPath();

    void setTransformationJarPath(String value);

    @TemplateParameter.Text(
        order = 26,
        optional = true,
        description = "Custom class name",
        helpText =
            "Fully qualified class name having the custom transformation logic.  It is a"
                + " mandatory field in case transformationJarPath is specified")
    @Default.String("")
    String getTransformationClassName();

    void setTransformationClassName(String value);

    @TemplateParameter.Text(
        order = 27,
        optional = true,
        description = "Custom parameters for transformation",
        helpText =
            "String containing any custom parameters to be passed to the custom transformation class.")
    @Default.String("")
    String getTransformationCustomParameters();

    void setTransformationCustomParameters(String value);

    @TemplateParameter.Text(
        order = 28,
        optional = true,
        description = "Filtered events directory",
        helpText =
            "This is the file path to store the events filtered via custom transformation. Default is a directory"
                + " under the Dataflow job's temp location. The default value is enough under most"
                + " conditions.")
    @Default.String("")
    String getFilteredEventsDirectory();

    void setFilteredEventsDirectory(String value);

    @TemplateParameter.GcsReadFile(
        order = 29,
        optional = true,
        helpText =
            "Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard."
                + "It is of the format Map<stream_name, Map<db_name, shard_id>>",
        description = "Sharding context file path in cloud storage")
    String getShardingContextFilePath();

    void setShardingContextFilePath(String value);

    @TemplateParameter.Text(
        order = 30,
        optional = true,
        description = "Table name overrides from source to spanner",
        regexes =
            "^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
        example = "[{Singers, Vocalists}, {Albums, Records}]",
        helpText =
            "These are the table name overrides from source to spanner. They are written in the"
                + "following format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]"
                + "This example shows mapping Singers table to Vocalists and Albums table to Records.")
    @Default.String("")
    String getTableOverrides();

    void setTableOverrides(String value);

    @TemplateParameter.Text(
        order = 31,
        optional = true,
        regexes =
            "^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
        description = "Column name overrides from source to spanner",
        example =
            "[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]",
        helpText =
            "These are the column name overrides from source to spanner. They are written in the"
                + "following format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]"
                + "Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides."
                + "The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively.")
    @Default.String("")
    String getColumnOverrides();

    void setColumnOverrides(String value);

    @TemplateParameter.Text(
        order = 32,
        optional = true,
        description = "File based overrides from source to spanner",
        helpText =
            "A file which specifies the table and the column name overrides from source to spanner.")
    @Default.String("")
    String getSchemaOverridesFilePath();

    void setSchemaOverridesFilePath(String value);
  }

  private static void validateSourceType(Options options) {
    boolean isRetryMode = "retryDLQ".equals(options.getRunMode());
    if (isRetryMode) {
      // retry mode does not read from Datastream
      return;
    }
    String sourceType = getSourceType(options);
    if (!DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES.contains(sourceType)) {
      throw new IllegalArgumentException(
          "Unsupported source type found: "
              + sourceType
              + ". Specify one of the following source types: "
              + DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES);
    }
    options.setDatastreamSourceType(sourceType);
  }

  static String getSourceType(Options options) {
    if (options.getDatastreamSourceType() != null) {
      return options.getDatastreamSourceType();
    }
    if (options.getStreamName() == null) {
      throw new IllegalArgumentException("Stream name cannot be empty.");
    }
    GcpOptions gcpOptions = options.as(GcpOptions.class);
    DataStreamClient datastreamClient;
    SourceConfig sourceConfig;
    try {
      datastreamClient = new DataStreamClient(gcpOptions.getGcpCredential());
      sourceConfig = datastreamClient.getSourceConnectionProfile(options.getStreamName());
    } catch (IOException e) {
      LOG.error("IOException Occurred: DataStreamClient failed initialization.");
      throw new IllegalArgumentException("Unable to initialize DatastreamClient: " + e);
    }
    // TODO: use getPostgresSourceConfig() instead of an else once SourceConfig.java is updated.
    if (sourceConfig.getMysqlSourceConfig() != null) {
      return DatastreamConstants.MYSQL_SOURCE_TYPE;
    } else if (sourceConfig.getOracleSourceConfig() != null) {
      return DatastreamConstants.ORACLE_SOURCE_TYPE;
    } else {
      return DatastreamConstants.POSTGRES_SOURCE_TYPE;
    }
    // LOG.error("Source Connection Profile Type Not Supported");
    // throw new IllegalArgumentException("Unsupported source connection profile type in
    // Datastream");
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();
    LOG.info("Starting DataStream to Cloud Spanner");
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);
    validateSourceType(options);
    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    /*
     * Stages:
     *   1) Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   2) Write JSON Strings to Cloud Spanner
     *   3) Write Failures to GCS Dead Letter Queue
     */
    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);
    // Ingest session file into schema object.
    Schema schema = SessionFileReader.read(options.getSessionFilePath());
    /*
     * Stage 1: Ingest/Normalize Data to FailsafeElement with JSON Strings and
     * read Cloud Spanner information schema.
     *   a) Prepare spanner config and process information schema
     *   b) Read DataStream data from GCS into JSON String FailsafeElements
     *   c) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements
     *   d) Flatten DataStream and DLQ Streams
     */

    // Prepare Spanner config
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
            .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()))
            .withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()))
            .withCommitRetrySettings(
                RetrySettings.newBuilder()
                    .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(4))
                    .setInitialRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
                    .setRetryDelayMultiplier(1)
                    .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
                    .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
                    .setRpcTimeoutMultiplier(1)
                    .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
                    .setMaxAttempts(1)
                    .build());
    /* Process information schema
     * 1) Read information schema from destination Cloud Spanner database
     * 2) Check if shadow tables are present and create if necessary
     * 3) Return new information schema
     */
    PCollection<Ddl> ddl =
        pipeline.apply(
            "Process Information Schema",
            new ProcessInformationSchema(
                spannerConfig,
                options.getShouldCreateShadowTables(),
                options.getShadowTablePrefix(),
                options.getDatastreamSourceType()));
    PCollectionView<Ddl> ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton());
    PCollection<FailsafeElement<String, String>> jsonRecords = null;
    // Elements sent to the Dead Letter Queue are to be reconsumed.
    // A DLQManager is to be created using PipelineOptions, and it is in charge
    // of building pieces of the DLQ.
    PCollectionTuple reconsumedElements = null;
    boolean isRegularMode = "regular".equals(options.getRunMode());
    if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) {
      reconsumedElements =
          dlqManager.getReconsumerDataTransformForFiles(
              pipeline.apply(
                  "Read retry from PubSub",
                  new PubSubNotifiedDlqIO(
                      options.getDlqGcsPubSubSubscription(),
                      // file paths to ignore when re-consuming for retry
                      new ArrayList<String>(
                          Arrays.asList("/severe/", "/tmp_retry", "/tmp_severe/", ".temp")))));
    } else {
      reconsumedElements =
          dlqManager.getReconsumerDataTransform(
              pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
    }
    PCollection<FailsafeElement<String, String>> dlqJsonRecords =
        reconsumedElements
            .get(DeadLetterQueueManager.RETRYABLE_ERRORS)
            .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
    if (isRegularMode) {
      LOG.info("Regular Datastream flow");
      PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
          pipeline.apply(
              new DataStreamIO(
                      options.getStreamName(),
                      options.getInputFilePattern(),
                      options.getInputFileFormat(),
                      options.getGcsPubSubSubscription(),
                      options.getRfcStartDateTime())
                  .withFileReadConcurrency(options.getFileReadConcurrency())
                  .withoutDatastreamRecordsReshuffle()
                  .withDirectoryWatchDuration(
                      Duration.standardMinutes(options.getDirectoryWatchDurationInMinutes())));
      int maxNumWorkers = options.getMaxNumWorkers() != 0 ? options.getMaxNumWorkers() : 1;
      jsonRecords =
          PCollectionList.of(datastreamJsonRecords)
              .and(dlqJsonRecords)
              .apply(Flatten.pCollections())
              .apply(
                  "Reshuffle",
                  Reshuffle.<FailsafeElement<String, String>>viaRandomKey()
                      .withNumBuckets(
                          maxNumWorkers * DatastreamToSpannerConstants.MAX_DOFN_PER_WORKER));
    } else {
      LOG.info("DLQ retry flow");
      jsonRecords =
          PCollectionList.of(dlqJsonRecords)
              .apply(Flatten.pCollections())
              .apply("Reshuffle", Reshuffle.viaRandomKey());
    }
    /*
     * Stage 2: Transform records
     */

    // Ingest transformation context file into memory.
    TransformationContext transformationContext =
        TransformationContextReader.getTransformationContext(
            options.getTransformationContextFilePath());

    // Ingest sharding context file into memory.
    ShardingContext shardingContext =
        ShardingContextReader.getShardingContext(options.getShardingContextFilePath());

    CustomTransformation customTransformation =
        CustomTransformation.builder(
                options.getTransformationJarPath(), options.getTransformationClassName())
            .setCustomParameters(options.getTransformationCustomParameters())
            .build();

    // Create the overrides mapping.
    ISchemaOverridesParser schemaOverridesParser = configureSchemaOverrides(options);

    ChangeEventTransformerDoFn changeEventTransformerDoFn =
        ChangeEventTransformerDoFn.create(
            schema,
            schemaOverridesParser,
            transformationContext,
            shardingContext,
            options.getDatastreamSourceType(),
            customTransformation,
            options.getRoundJsonDecimals(),
            ddlView,
            spannerConfig);

    PCollectionTuple transformedRecords =
        jsonRecords.apply(
            "Apply Transformation to events",
            ParDo.of(changeEventTransformerDoFn)
                .withSideInputs(ddlView)
                .withOutputTags(
                    DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG,
                    TupleTagList.of(
                        Arrays.asList(
                            DatastreamToSpannerConstants.FILTERED_EVENT_TAG,
                            DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))));

    /*
     * Stage 3: Write filtered records to GCS
     */
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String filterEventsDirectory =
        options.getFilteredEventsDirectory().isEmpty()
            ? tempLocation + "filteredEvents/"
            : options.getFilteredEventsDirectory();
    LOG.info("Filtered events directory: {}", filterEventsDirectory);
    transformedRecords
        .get(DatastreamToSpannerConstants.FILTERED_EVENT_TAG)
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
        .apply(
            "Write Filtered Events To GCS",
            TextIO.write().to(filterEventsDirectory).withSuffix(".json").withWindowedWrites());

    /*
     * Stage 4: Write transformed records to Cloud Spanner
     */

    SpannerTransactionWriter.Result spannerWriteResults =
        transformedRecords
            .get(DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG)
            .apply(
                "Write events to Cloud Spanner",
                new SpannerTransactionWriter(
                    spannerConfig,
                    ddlView,
                    options.getShadowTablePrefix(),
                    options.getDatastreamSourceType(),
                    isRegularMode));
    /*
     * Stage 5: Write failures to GCS Dead Letter Queue
     * a) Retryable errors are written to retry GCS Dead letter queue
     * b) Severe errors are written to severe GCS Dead letter queue
     */
    // We will write only the original payload from the failsafe event to the DLQ.  We are doing
    // that in
    // StringDeadLetterQueueSanitizer.
    spannerWriteResults
        .retryableErrors()
        .apply(
            "DLQ: Write retryable Failures to GCS",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            "Write To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime())
                .withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/")
                .setIncludePaneInfo(true)
                .build());
    PCollection<FailsafeElement<String, String>> dlqErrorRecords =
        reconsumedElements
            .get(DeadLetterQueueManager.PERMANENT_ERRORS)
            .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
    // TODO: Write errors from transformer and spanner writer into separate folders
    PCollection<FailsafeElement<String, String>> permanentErrors =
        PCollectionList.of(dlqErrorRecords)
            .and(spannerWriteResults.permanentErrors())
            .and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))
            .apply(Flatten.pCollections());
    // increment the metrics
    permanentErrors
        .apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode)))
        .apply(
            "DLQ: Write Severe errors to GCS",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            "Write To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/")
                .setIncludePaneInfo(true)
                .build());
    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  private static DeadLetterQueueManager buildDlqManager(Options options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDeadLetterQueueDirectory().isEmpty()
            ? tempLocation + "dlq/"
            : options.getDeadLetterQueueDirectory();
    LOG.info("Dead-letter queue directory: {}", dlqDirectory);
    options.setDeadLetterQueueDirectory(dlqDirectory);
    if ("regular".equals(options.getRunMode())) {
      return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount());
    } else {
      String retryDlqUri =
          FileSystems.matchNewResource(dlqDirectory, true)
              .resolve("severe", StandardResolveOptions.RESOLVE_DIRECTORY)
              .toString();
      LOG.info("Dead-letter retry directory: {}", retryDlqUri);
      return DeadLetterQueueManager.create(dlqDirectory, retryDlqUri, 0);
    }
  }

  static ISchemaOverridesParser configureSchemaOverrides(Options options) {
    // incorrect configuration
    if (!options.getSchemaOverridesFilePath().isEmpty()
        && (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty())) {
      throw new IllegalArgumentException(
          "Only one of file based or string based overrides must be configured! Please correct the configuration and re-run the job");
    }
    // string based overrides
    if (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty()) {
      Map<String, String> userOptionsOverrides = new HashMap<>();
      if (!options.getTableOverrides().isEmpty()) {
        userOptionsOverrides.put("tableOverrides", options.getTableOverrides());
      }
      if (!options.getColumnOverrides().isEmpty()) {
        userOptionsOverrides.put("columnOverrides", options.getColumnOverrides());
      }
      return new SchemaStringOverridesParser(userOptionsOverrides);
    }
    // file based overrides
    if (!options.getSchemaOverridesFilePath().isEmpty()) {
      return new SchemaFileOverridesParser(options.getSchemaOverridesFilePath());
    }
    // no overrides
    return new NoopSchemaOverridesParser();
  }
}

次のステップ