Datastream to Spanner 템플릿

Datastream to Spanner 템플릿은 Cloud Storage 버킷에서 Datastream 이벤트를 읽고 Spanner 데이터베이스에 쓰는 스트리밍 파이프라인입니다. 데이터를 Datastream 소스에서 Spanner로 마이그레이션합니다.

마이그레이션에 필요한 모든 테이블은 템플릿을 실행하기 전에 대상 Spanner 데이터베이스에 있어야 합니다. 따라서 데이터 마이그레이션 전에 소스 데이터베이스에서 대상 Spanner로 스키마 마이그레이션을 완료해야 합니다. 마이그레이션하기 전에 데이터가 테이블에 있을 수 있습니다. 이 템플릿은 Datastream 스키마 변경사항을 Spanner 데이터베이스에 전파하지 않습니다.

데이터 일관성은 모든 데이터가 Spanner에 기록될 때 마이그레이션 종료 시에만 보장됩니다. Spanner에 쓰인 각 레코드의 순서 정보를 저장하도록 이 템플릿은 Spanner 데이터베이스의 각 테이블에 추가 테이블 (섀도 테이블이라고 함)을 만듭니다. 이 테이블은 마이그레이션 종료 시 일관성을 보장하는 데 사용됩니다. 섀도 테이블은 마이그레이션 후에 삭제되지 않으며 마이그레이션 종료 시 유효성 검사 목적으로 사용될 수 있습니다.

스키마 불일치, 잘못된 형식의 JSON 파일 또는 변환 실행으로 발생하는 오류와 같은 작업 중에 발생하는 모든 오류는 오류 큐에 기록됩니다. 오류 큐는 오류와 함께 오류가 발생한 모든 Datastream 이벤트를 텍스트 형식으로 저장하는 Cloud Storage 폴더입니다. 오류는 일시적이거나 영구적일 수 있으며 오류 큐의 적절한 Cloud Storage 폴더에 저장됩니다. 일시적인 오류는 자동으로 재시도되지만 영구 오류는 그렇지 않습니다. 영구적인 오류가 발생할 경우 변경 이벤트를 수정하고 템플릿이 실행되는 동안 재시도 가능한 버킷으로 이동할 수 있습니다.

파이프라인 요구사항

  • 실행 중 또는 시작되지 않음 상태의 Datastream 스트림
  • Datastream 이벤트가 복제되는 Cloud Storage 버킷
  • 기존 테이블이 있는 Spanner 데이터베이스 이러한 테이블은 비어 있거나 데이터를 포함할 수 있습니다.

템플릿 매개변수

필수 매개변수

  • instanceId: 변경사항이 복제된 Spanner 인스턴스입니다.
  • databaseId: 변경사항이 복제된 Spanner 데이터베이스입니다.

선택적 매개변수

  • inputFilePattern: 복제할 Datastream 파일이 포함된 Cloud Storage 파일 위치입니다. 일반적으로 이는 스트림의 루트 경로입니다. 이 기능에 대한 지원이 중지되었습니다.
  • inputFileFormat: Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json입니다. 기본값은 avro입니다.
  • sessionFilePath: HarbourBridge의 매핑 정보가 포함된 Cloud Storage의 세션 파일 경로입니다.
  • projectId: Spanner 프로젝트 ID입니다.
  • spannerHost: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 예를 들면 https://batch-spanner.googleapis.com입니다. 기본값은 https://batch-spanner.googleapis.com입니다.
  • gcsPubSubSubscription: Cloud Storage 알림 정책에 사용 중인 Pub/Sub 구독입니다. 이름에는 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> 형식을 사용합니다.
  • streamName: 스키마 정보와 소스 유형을 폴링할 스트림의 이름이나 템플릿입니다.
  • shadowTablePrefix: 섀도우 테이블의 이름을 지정하는 데 사용되는 접두사입니다. 기본값: shadow_
  • shouldCreateShadowTables: 이 플래그는 Cloud Spanner 데이터베이스에 섀도 테이블을 만들어야 하는지 여부를 나타냅니다. 기본값은 true입니다.
  • rfcStartDateTime: Cloud Storage에서 가져오는 데 사용되는 시작 DateTime입니다 (https://tools.ietf.org/html/rfc3339). 기본값은 1970-01-01T00:00:00.00Z입니다.
  • fileReadConcurrency: 읽을 동시 DataStream 파일 수입니다. 기본값은 30입니다.
  • deadLetterQueueDirectory: 오류 큐 출력을 저장할 때 사용되는 파일 경로입니다. 기본 파일 경로는 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다.
  • dlqRetryMinutes: 데드 레터 큐 재시도 간격(분)입니다. 기본값은 10입니다.
  • dlqMaxRetryCount: DLQ를 통해 일시적인 오류를 재시도할 수 있는 최대 횟수입니다. 기본값은 500입니다.
  • dataStreamRootUrl: Datastream API 루트 URL입니다. 기본값은 https://datastream.googleapis.com/입니다.
  • datastreamSourceType: Datastream이 연결되는 소스 데이터베이스의 유형입니다. 예: mysql/oracle 실제로 실행 중인 Datastream이 없는 경우 테스트할 때 설정해야 합니다.
  • roundJsonDecimals: 이 플래그가 설정되면 json 열의 소수점 값을 정밀도 손실 없이 저장할 수 있는 숫자로 반올림합니다. 기본값은 false입니다.
  • runMode: 일반 모드인지 또는 retryDLQ 모드인지 여부에 따른 실행 모드 유형입니다. 기본값은 regular입니다.
  • transformationContextFilePath: 이전 중에 실행된 변환에 사용된 데이터를 채우는 데 사용되는 Cloud Storage의 변환 컨텍스트 파일 경로입니다. 예: 행이 이전된 데이터베이스를 식별하기 위한 샤드 ID와 데이터베이스 이름
  • directoryWatchDurationInMinutes: 파이프라인이 GCS에서 디렉터리를 계속 폴링해야 하는 기간입니다. Datastreamoutput 파일은 이벤트의 타임스탬프를 분 단위로 그룹화하여 보여주는 디렉터리 구조로 정렬됩니다. 이 매개변수는 소스 데이터베이스에서 발생한 이벤트와 Datastream에서 GCS에 쓰는 동일한 이벤트 간에 발생할 수 있는 최대 지연 시간과 거의 같습니다. 99.9백분위수 = 10분 기본값은 10입니다.
  • spannerPriority: Cloud Spanner 호출의 요청 우선순위입니다. 값은 [HIGH,MEDIUM,LOW] 중 하나여야 합니다. 기본값은 HIGH입니다.
  • dlqGcsPubSubSubscription: 일반 모드에서 실행할 때 DLQ 재시도 디렉터리의 Cloud Storage 알림 정책에 사용되는 Pub/Sub 구독입니다. 이름에는 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> 형식을 사용합니다. 이 옵션을 설정하면 deadLetterQueueDirectory 및 dlqRetryMinutes가 무시됩니다.
  • transformationJarPath: 전방 이전에서 레코드를 처리하기 위한 커스텀 변환 로직이 포함된 파일의 Cloud Storage에 있는 커스텀 JAR 파일 위치입니다. 기본값은 빈 값입니다.
  • transformationClassName: 커스텀 변환 로직이 있는 정규화된 클래스 이름입니다. transformationJarPath가 지정된 경우 필수 필드입니다. 기본값은 빈 값입니다.
  • transformationCustomParameters: 커스텀 변환 클래스에 전달할 커스텀 매개변수가 포함된 문자열입니다. 기본값은 빈 값입니다.
  • filteredEventsDirectory: 맞춤 변환을 통해 필터링된 이벤트를 저장하는 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 상황에서는 기본값이면 충분합니다.
  • shardingContextFilePath: Cloud Storage의 샤딩 컨텍스트 파일 경로는 각 소스 샤드의 Spanner 데이터베이스에 샤드 ID를 채우는 데 사용됩니다.형식은 Map<stream_name, Map<db_name, shard_id>>입니다.
  • tableOverrides: 소스에서 Spanner로의 테이블 이름 재정의입니다. 다음 형식으로 작성됩니다. [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]이 예에서는 Singers 테이블을 Vocalists 테이블에, Albums 테이블을 레코드에 매핑하는 것을 보여줍니다. 예를 들면 [{Singers, Vocalists}, {Albums, Records}]입니다. 기본값은 빈 값입니다.
  • columnOverrides: 소스에서 스패너로의 열 이름 재정의입니다. 다음 형식으로 작성됩니다. [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]SourceTableName은 소스 및 스패너 쌍에서 동일하게 유지되어야 합니다. 테이블 이름을 재정의하려면 tableOverrides를 사용하세요.이 예에서는 Singers 및 Albums 테이블에서 SingerName을 TalentName에, AlbumName을 RecordName에 각각 매핑하는 것을 보여줍니다. 예를 들면 [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]입니다. 기본값은 빈 값입니다.
  • schemaOverridesFilePath: 소스에서 스패너로 테이블 및 열 이름 재정의를 지정하는 파일입니다. 기본값은 빈 값입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Datastream to Spanner template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • GCS_FILE_PATH: Datastream 이벤트를 저장하는 데 사용되는 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • CLOUDSPANNER_INSTANCE: Spanner 인스턴스
  • CLOUDSPANNER_DATABASE: Spanner 데이터베이스
  • DLQ: 오류 큐 디렉터리의 Cloud Storage 경로

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • GCS_FILE_PATH: Datastream 이벤트를 저장하는 데 사용되는 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • CLOUDSPANNER_INSTANCE: Spanner 인스턴스
  • CLOUDSPANNER_DATABASE: Spanner 데이터베이스
  • DLQ: 오류 큐 디렉터리의 Cloud Storage 경로
Java
/*
 * Copyright (C) 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.services.datastream.v1.model.SourceConfig;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.datastream.sources.DataStreamIO;
import com.google.cloud.teleport.v2.datastream.utils.DataStreamClient;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NoopSchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options;
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants;
import com.google.cloud.teleport.v2.templates.spanner.ProcessInformationSchema;
import com.google.cloud.teleport.v2.templates.transform.ChangeEventTransformerDoFn;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests DataStream data from GCS as events. The events are written to Cloud
 * Spanner.
 *
 * <p>NOTE: Future versions will support: Pub/Sub, GCS, or Kafka as per DataStream
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Datastream_to_Spanner",
    category = TemplateCategory.STREAMING,
    displayName = "Datastream to Cloud Spanner",
    description = {
      "The Datastream to Cloud Spanner template is a streaming pipeline that reads <a"
          + " href=\"https://cloud.google.com/datastream/docs\">Datastream</a> events from a Cloud"
          + " Storage bucket and writes them to a Cloud Spanner database. It is intended for data"
          + " migration from Datastream sources to Cloud Spanner.\n",
      "All tables required for migration must exist in the destination Cloud Spanner database prior"
          + " to template execution. Hence schema migration from a source database to destination"
          + " Cloud Spanner must be completed prior to data migration. Data can exist in the tables"
          + " prior to migration. This template does not propagate Datastream schema changes to the"
          + " Cloud Spanner database.\n",
      "Data consistency is guaranteed only at the end of migration when all data has been written"
          + " to Cloud Spanner. To store ordering information for each record written to Cloud"
          + " Spanner, this template creates an additional table (called a shadow table) for each"
          + " table in the Cloud Spanner database. This is used to ensure consistency at the end of"
          + " migration. The shadow tables are not deleted after migration and can be used for"
          + " validation purposes at the end of migration.\n",
      "Any errors that occur during operation, such as schema mismatches, malformed JSON files, or"
          + " errors resulting from executing transforms, are recorded in an error queue. The error"
          + " queue is a Cloud Storage folder which stores all the Datastream events that had"
          + " encountered errors along with the error reason in text format. The errors can be"
          + " transient or permanent and are stored in appropriate Cloud Storage folders in the"
          + " error queue. The transient errors are retried automatically while the permanent"
          + " errors are not. In case of permanent errors, you have the option of making"
          + " corrections to the change events and moving them to the retriable bucket while the"
          + " template is running."
    },
    optionsClass = Options.class,
    flexContainerName = "datastream-to-spanner",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/datastream-to-cloud-spanner",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "A Datastream stream in Running or Not started state.",
      "A Cloud Storage bucket where Datastream events are replicated.",
      "A Cloud Spanner database with existing tables. These tables can be empty or contain data.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class DataStreamToSpanner {
  private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpanner.class);
  private static final String AVRO_SUFFIX = "avro";
  private static final String JSON_SUFFIX = "json";

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions, StreamingOptions, DataflowPipelineWorkerPoolOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        groupName = "Source",
        optional = true,
        description =
            "File location for Datastream file output in Cloud Storage. Support for this feature has been disabled.",
        helpText =
            "The Cloud Storage file location that contains the Datastream files to replicate. Typically, "
                + "this is the root path for a stream. Support for this feature has been disabled.")
    String getInputFilePattern();

    void setInputFilePattern(String value);

    @TemplateParameter.Enum(
        order = 2,
        enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("json")},
        optional = true,
        description = "Datastream output file format (avro/json).",
        helpText =
            "The format of the output file produced by Datastream. For example `avro,json`. Defaults to `avro`.")
    @Default.String("avro")
    String getInputFileFormat();

    void setInputFileFormat(String value);

    @TemplateParameter.GcsReadFile(
        order = 3,
        optional = true,
        description = "Session File Path in Cloud Storage",
        helpText =
            "Session file path in Cloud Storage that contains mapping information from"
                + " HarbourBridge")
    String getSessionFilePath();

    void setSessionFilePath(String value);

    @TemplateParameter.Text(
        order = 4,
        groupName = "Target",
        description = "Cloud Spanner Instance Id.",
        helpText = "The Spanner instance where the changes are replicated.")
    String getInstanceId();

    void setInstanceId(String value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        description = "Cloud Spanner Database Id.",
        helpText = "The Spanner database where the changes are replicated.")
    String getDatabaseId();

    void setDatabaseId(String value);

    @TemplateParameter.ProjectId(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Cloud Spanner Project Id.",
        helpText = "The Spanner project ID.")
    String getProjectId();

    void setProjectId(String projectId);

    @TemplateParameter.Text(
        order = 7,
        groupName = "Target",
        optional = true,
        description = "The Cloud Spanner Endpoint to call",
        helpText = "The Cloud Spanner endpoint to call in the template.",
        example = "https://batch-spanner.googleapis.com")
    @Default.String("https://batch-spanner.googleapis.com")
    String getSpannerHost();

    void setSpannerHost(String value);

    @TemplateParameter.PubsubSubscription(
        order = 8,
        optional = true,
        description = "The Pub/Sub subscription being used in a Cloud Storage notification policy.",
        helpText =
            "The Pub/Sub subscription being used in a Cloud Storage notification policy. For the name,"
                + " use the format `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>`.")
    String getGcsPubSubSubscription();

    void setGcsPubSubSubscription(String value);

    @TemplateParameter.Text(
        order = 9,
        groupName = "Source",
        optional = true,
        description = "Datastream stream name.",
        helpText =
            "The name or template for the stream to poll for schema information and source type.")
    String getStreamName();

    void setStreamName(String value);

    @TemplateParameter.Text(
        order = 10,
        optional = true,
        description = "Cloud Spanner shadow table prefix.",
        helpText = "The prefix used to name shadow tables. Default: `shadow_`.")
    @Default.String("shadow_")
    String getShadowTablePrefix();

    void setShadowTablePrefix(String value);

    @TemplateParameter.Boolean(
        order = 11,
        optional = true,
        description = "If true, create shadow tables in Cloud Spanner.",
        helpText =
            "This flag indicates whether shadow tables must be created in Cloud Spanner database.")
    @Default.Boolean(true)
    Boolean getShouldCreateShadowTables();

    void setShouldCreateShadowTables(Boolean value);

    @TemplateParameter.DateTime(
        order = 12,
        optional = true,
        description =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).",
        helpText =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).")
    @Default.String("1970-01-01T00:00:00.00Z")
    String getRfcStartDateTime();

    void setRfcStartDateTime(String value);

    @TemplateParameter.Integer(
        order = 13,
        optional = true,
        description = "File read concurrency",
        helpText = "The number of concurrent DataStream files to read.")
    @Default.Integer(30)
    Integer getFileReadConcurrency();

    void setFileReadConcurrency(Integer value);

    @TemplateParameter.Text(
        order = 14,
        optional = true,
        description = "Dead letter queue directory.",
        helpText =
            "The file path used when storing the error queue output. "
                + "The default file path is a directory under the Dataflow job's temp location.")
    @Default.String("")
    String getDeadLetterQueueDirectory();

    void setDeadLetterQueueDirectory(String value);

    @TemplateParameter.Integer(
        order = 15,
        optional = true,
        description = "Dead letter queue retry minutes",
        helpText = "The number of minutes between dead letter queue retries. Defaults to `10`.")
    @Default.Integer(10)
    Integer getDlqRetryMinutes();

    void setDlqRetryMinutes(Integer value);

    @TemplateParameter.Integer(
        order = 16,
        optional = true,
        description = "Dead letter queue maximum retry count",
        helpText =
            "The max number of times temporary errors can be retried through DLQ. Defaults to `500`.")
    @Default.Integer(500)
    Integer getDlqMaxRetryCount();

    void setDlqMaxRetryCount(Integer value);

    // DataStream API Root Url (only used for testing)
    @TemplateParameter.Text(
        order = 17,
        optional = true,
        description = "Datastream API Root URL (only required for testing)",
        helpText = "Datastream API Root URL.")
    @Default.String("https://datastream.googleapis.com/")
    String getDataStreamRootUrl();

    void setDataStreamRootUrl(String value);

    @TemplateParameter.Text(
        order = 18,
        optional = true,
        description = "Datastream source type (only required for testing)",
        helpText =
            "This is the type of source database that Datastream connects to. Example -"
                + " mysql/oracle. Need to be set when testing without an actual running"
                + " Datastream.")
    String getDatastreamSourceType();

    void setDatastreamSourceType(String value);

    @TemplateParameter.Boolean(
        order = 19,
        optional = true,
        description =
            "If true, rounds the decimal values in json columns to a number that can be stored"
                + " without loss of precision.",
        helpText =
            "This flag if set, rounds the decimal values in json columns to a number that can be"
                + " stored without loss of precision.")
    @Default.Boolean(false)
    Boolean getRoundJsonDecimals();

    void setRoundJsonDecimals(Boolean value);

    @TemplateParameter.Enum(
        order = 20,
        optional = true,
        description = "Run mode - currently supported are : regular or retryDLQ",
        enumOptions = {@TemplateEnumOption("regular"), @TemplateEnumOption("retryDLQ")},
        helpText = "This is the run mode type, whether regular or with retryDLQ.")
    @Default.String("regular")
    String getRunMode();

    void setRunMode(String value);

    @TemplateParameter.GcsReadFile(
        order = 21,
        optional = true,
        helpText =
            "Transformation context file path in cloud storage used to populate data used in"
                + " transformations performed during migrations   Eg: The shard id to db name to"
                + " identify the db from which a row was migrated",
        description = "Transformation context file path in cloud storage")
    String getTransformationContextFilePath();

    void setTransformationContextFilePath(String value);

    @TemplateParameter.Integer(
        order = 22,
        optional = true,
        description = "Directory watch duration in minutes. Default: 10 minutes",
        helpText =
            "The Duration for which the pipeline should keep polling a directory in GCS. Datastream"
                + "output files are arranged in a directory structure which depicts the timestamp "
                + "of the event grouped by minutes. This parameter should be approximately equal to"
                + "maximum delay which could occur between event occurring in source database and "
                + "the same event being written to GCS by Datastream. 99.9 percentile = 10 minutes")
    @Default.Integer(10)
    Integer getDirectoryWatchDurationInMinutes();

    void setDirectoryWatchDurationInMinutes(Integer value);

    @TemplateParameter.Enum(
        order = 23,
        enumOptions = {
          @TemplateEnumOption("LOW"),
          @TemplateEnumOption("MEDIUM"),
          @TemplateEnumOption("HIGH")
        },
        optional = true,
        description = "Priority for Spanner RPC invocations",
        helpText =
            "The request priority for Cloud Spanner calls. The value must be one of:"
                + " [`HIGH`,`MEDIUM`,`LOW`]. Defaults to `HIGH`.")
    @Default.Enum("HIGH")
    RpcPriority getSpannerPriority();

    void setSpannerPriority(RpcPriority value);

    @TemplateParameter.PubsubSubscription(
        order = 24,
        optional = true,
        description =
            "The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
                + " retry directory when running in regular mode.",
        helpText =
            "The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
                + " retry directory when running in regular mode. For the name, use the format"
                + " `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>`. When set, the"
                + " deadLetterQueueDirectory and dlqRetryMinutes are ignored.")
    String getDlqGcsPubSubSubscription();

    void setDlqGcsPubSubSubscription(String value);

    @TemplateParameter.GcsReadFile(
        order = 25,
        optional = true,
        description = "Custom jar location in Cloud Storage",
        helpText =
            "Custom JAR file location in Cloud Storage for the file that contains the custom transformation logic for processing records"
                + " in forward migration.")
    @Default.String("")
    String getTransformationJarPath();

    void setTransformationJarPath(String value);

    @TemplateParameter.Text(
        order = 26,
        optional = true,
        description = "Custom class name",
        helpText =
            "Fully qualified class name having the custom transformation logic.  It is a"
                + " mandatory field in case transformationJarPath is specified")
    @Default.String("")
    String getTransformationClassName();

    void setTransformationClassName(String value);

    @TemplateParameter.Text(
        order = 27,
        optional = true,
        description = "Custom parameters for transformation",
        helpText =
            "String containing any custom parameters to be passed to the custom transformation class.")
    @Default.String("")
    String getTransformationCustomParameters();

    void setTransformationCustomParameters(String value);

    @TemplateParameter.Text(
        order = 28,
        optional = true,
        description = "Filtered events directory",
        helpText =
            "This is the file path to store the events filtered via custom transformation. Default is a directory"
                + " under the Dataflow job's temp location. The default value is enough under most"
                + " conditions.")
    @Default.String("")
    String getFilteredEventsDirectory();

    void setFilteredEventsDirectory(String value);

    @TemplateParameter.GcsReadFile(
        order = 29,
        optional = true,
        helpText =
            "Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard."
                + "It is of the format Map<stream_name, Map<db_name, shard_id>>",
        description = "Sharding context file path in cloud storage")
    String getShardingContextFilePath();

    void setShardingContextFilePath(String value);

    @TemplateParameter.Text(
        order = 30,
        optional = true,
        description = "Table name overrides from source to spanner",
        regexes =
            "^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
        example = "[{Singers, Vocalists}, {Albums, Records}]",
        helpText =
            "These are the table name overrides from source to spanner. They are written in the"
                + "following format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]"
                + "This example shows mapping Singers table to Vocalists and Albums table to Records.")
    @Default.String("")
    String getTableOverrides();

    void setTableOverrides(String value);

    @TemplateParameter.Text(
        order = 31,
        optional = true,
        regexes =
            "^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
        description = "Column name overrides from source to spanner",
        example =
            "[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]",
        helpText =
            "These are the column name overrides from source to spanner. They are written in the"
                + "following format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]"
                + "Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides."
                + "The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively.")
    @Default.String("")
    String getColumnOverrides();

    void setColumnOverrides(String value);

    @TemplateParameter.Text(
        order = 32,
        optional = true,
        description = "File based overrides from source to spanner",
        helpText =
            "A file which specifies the table and the column name overrides from source to spanner.")
    @Default.String("")
    String getSchemaOverridesFilePath();

    void setSchemaOverridesFilePath(String value);

    @TemplateParameter.Text(
        order = 33,
        optional = true,
        groupName = "Target",
        description = "Cloud Spanner Shadow Table Instance Id.",
        helpText =
            "Optional separate instance for shadow tables. If not specified, shadow tables will be created in the main instance. If specified, ensure shadowTableSpannerDatabaseId is specified as well.")
    @Default.String("")
    String getShadowTableSpannerInstanceId();

    void setShadowTableSpannerInstanceId(String value);

    @TemplateParameter.Text(
        order = 33,
        optional = true,
        groupName = "Target",
        description = "Cloud Spanner Shadow Table Database Id.",
        helpText =
            "Optional separate database for shadow tables. If not specified, shadow tables will be created in the main database. If specified, ensure shadowTableSpannerInstanceId is specified as well.")
    @Default.String("")
    String getShadowTableSpannerDatabaseId();

    void setShadowTableSpannerDatabaseId(String value);
  }

  private static void validateSourceType(Options options) {
    boolean isRetryMode = "retryDLQ".equals(options.getRunMode());
    if (isRetryMode) {
      // retry mode does not read from Datastream
      return;
    }
    String sourceType = getSourceType(options);
    if (!DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES.contains(sourceType)) {
      throw new IllegalArgumentException(
          "Unsupported source type found: "
              + sourceType
              + ". Specify one of the following source types: "
              + DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES);
    }
    options.setDatastreamSourceType(sourceType);
  }

  static String getSourceType(Options options) {
    if (options.getDatastreamSourceType() != null) {
      return options.getDatastreamSourceType();
    }
    if (options.getStreamName() == null) {
      throw new IllegalArgumentException("Stream name cannot be empty.");
    }
    GcpOptions gcpOptions = options.as(GcpOptions.class);
    DataStreamClient datastreamClient;
    SourceConfig sourceConfig;
    try {
      datastreamClient = new DataStreamClient(gcpOptions.getGcpCredential());
      sourceConfig = datastreamClient.getSourceConnectionProfile(options.getStreamName());
    } catch (IOException e) {
      LOG.error("IOException Occurred: DataStreamClient failed initialization.");
      throw new IllegalArgumentException("Unable to initialize DatastreamClient: " + e);
    }
    // TODO: use getPostgresSourceConfig() instead of an else once SourceConfig.java is updated.
    if (sourceConfig.getMysqlSourceConfig() != null) {
      return DatastreamConstants.MYSQL_SOURCE_TYPE;
    } else if (sourceConfig.getOracleSourceConfig() != null) {
      return DatastreamConstants.ORACLE_SOURCE_TYPE;
    } else {
      return DatastreamConstants.POSTGRES_SOURCE_TYPE;
    }
    // LOG.error("Source Connection Profile Type Not Supported");
    // throw new IllegalArgumentException("Unsupported source connection profile type in
    // Datastream");
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();
    LOG.info("Starting DataStream to Cloud Spanner");
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);
    validateSourceType(options);
    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    /*
     * Stages:
     *   1) Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   2) Write JSON Strings to Cloud Spanner
     *   3) Write Failures to GCS Dead Letter Queue
     */
    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);
    // Ingest session file into schema object.
    Schema schema = SessionFileReader.read(options.getSessionFilePath());
    /*
     * Stage 1: Ingest/Normalize Data to FailsafeElement with JSON Strings and
     * read Cloud Spanner information schema.
     *   a) Prepare spanner config and process information schema
     *   b) Read DataStream data from GCS into JSON String FailsafeElements
     *   c) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements
     *   d) Flatten DataStream and DLQ Streams
     */

    // Prepare Spanner config
    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
            .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()))
            .withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()))
            .withCommitRetrySettings(
                RetrySettings.newBuilder()
                    .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(4))
                    .setInitialRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
                    .setRetryDelayMultiplier(1)
                    .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
                    .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
                    .setRpcTimeoutMultiplier(1)
                    .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
                    .setMaxAttempts(1)
                    .build());
    SpannerConfig shadowTableSpannerConfig = getShadowTableSpannerConfig(options);
    /* Process information schema
     * 1) Read information schema from destination Cloud Spanner database
     * 2) Check if shadow tables are present and create if necessary
     * 3) Return new information schema
     */
    PCollectionTuple ddlTuple =
        pipeline.apply(
            "Process Information Schema",
            new ProcessInformationSchema(
                spannerConfig,
                shadowTableSpannerConfig,
                options.getShouldCreateShadowTables(),
                options.getShadowTablePrefix(),
                options.getDatastreamSourceType()));
    PCollectionView<Ddl> ddlView =
        ddlTuple
            .get(ProcessInformationSchema.MAIN_DDL_TAG)
            .apply("Cloud Spanner Main DDL as view", View.asSingleton());

    PCollectionView<Ddl> shadowTableDdlView =
        ddlTuple
            .get(ProcessInformationSchema.SHADOW_TABLE_DDL_TAG)
            .apply("Cloud Spanner shadow tables DDL as view", View.asSingleton());

    PCollection<FailsafeElement<String, String>> jsonRecords = null;
    // Elements sent to the Dead Letter Queue are to be reconsumed.
    // A DLQManager is to be created using PipelineOptions, and it is in charge
    // of building pieces of the DLQ.
    PCollectionTuple reconsumedElements = null;
    boolean isRegularMode = "regular".equals(options.getRunMode());
    if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) {
      reconsumedElements =
          dlqManager.getReconsumerDataTransformForFiles(
              pipeline.apply(
                  "Read retry from PubSub",
                  new PubSubNotifiedDlqIO(
                      options.getDlqGcsPubSubSubscription(),
                      // file paths to ignore when re-consuming for retry
                      new ArrayList<String>(
                          Arrays.asList("/severe/", "/tmp_retry", "/tmp_severe/", ".temp")))));
    } else {
      reconsumedElements =
          dlqManager.getReconsumerDataTransform(
              pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
    }
    PCollection<FailsafeElement<String, String>> dlqJsonRecords =
        reconsumedElements
            .get(DeadLetterQueueManager.RETRYABLE_ERRORS)
            .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
    if (isRegularMode) {
      LOG.info("Regular Datastream flow");
      PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
          pipeline.apply(
              new DataStreamIO(
                      options.getStreamName(),
                      options.getInputFilePattern(),
                      options.getInputFileFormat(),
                      options.getGcsPubSubSubscription(),
                      options.getRfcStartDateTime())
                  .withFileReadConcurrency(options.getFileReadConcurrency())
                  .withoutDatastreamRecordsReshuffle()
                  .withDirectoryWatchDuration(
                      Duration.standardMinutes(options.getDirectoryWatchDurationInMinutes())));
      int maxNumWorkers = options.getMaxNumWorkers() != 0 ? options.getMaxNumWorkers() : 1;
      jsonRecords =
          PCollectionList.of(datastreamJsonRecords)
              .and(dlqJsonRecords)
              .apply(Flatten.pCollections())
              .apply(
                  "Reshuffle",
                  Reshuffle.<FailsafeElement<String, String>>viaRandomKey()
                      .withNumBuckets(
                          maxNumWorkers * DatastreamToSpannerConstants.MAX_DOFN_PER_WORKER));
    } else {
      LOG.info("DLQ retry flow");
      jsonRecords =
          PCollectionList.of(dlqJsonRecords)
              .apply(Flatten.pCollections())
              .apply("Reshuffle", Reshuffle.viaRandomKey());
    }
    /*
     * Stage 2: Transform records
     */

    // Ingest transformation context file into memory.
    TransformationContext transformationContext =
        TransformationContextReader.getTransformationContext(
            options.getTransformationContextFilePath());

    // Ingest sharding context file into memory.
    ShardingContext shardingContext =
        ShardingContextReader.getShardingContext(options.getShardingContextFilePath());

    CustomTransformation customTransformation =
        CustomTransformation.builder(
                options.getTransformationJarPath(), options.getTransformationClassName())
            .setCustomParameters(options.getTransformationCustomParameters())
            .build();

    // Create the overrides mapping.
    ISchemaOverridesParser schemaOverridesParser = configureSchemaOverrides(options);

    ChangeEventTransformerDoFn changeEventTransformerDoFn =
        ChangeEventTransformerDoFn.create(
            schema,
            schemaOverridesParser,
            transformationContext,
            shardingContext,
            options.getDatastreamSourceType(),
            customTransformation,
            options.getRoundJsonDecimals(),
            ddlView,
            spannerConfig);

    PCollectionTuple transformedRecords =
        jsonRecords.apply(
            "Apply Transformation to events",
            ParDo.of(changeEventTransformerDoFn)
                .withSideInputs(ddlView)
                .withOutputTags(
                    DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG,
                    TupleTagList.of(
                        Arrays.asList(
                            DatastreamToSpannerConstants.FILTERED_EVENT_TAG,
                            DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))));

    /*
     * Stage 3: Write filtered records to GCS
     */
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String filterEventsDirectory =
        options.getFilteredEventsDirectory().isEmpty()
            ? tempLocation + "filteredEvents/"
            : options.getFilteredEventsDirectory();
    LOG.info("Filtered events directory: {}", filterEventsDirectory);
    transformedRecords
        .get(DatastreamToSpannerConstants.FILTERED_EVENT_TAG)
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
        .apply(
            "Write Filtered Events To GCS",
            TextIO.write().to(filterEventsDirectory).withSuffix(".json").withWindowedWrites());

    /*
     * Stage 4: Write transformed records to Cloud Spanner
     */
    SpannerTransactionWriter.Result spannerWriteResults =
        transformedRecords
            .get(DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG)
            .apply(
                "Write events to Cloud Spanner",
                new SpannerTransactionWriter(
                    spannerConfig,
                    shadowTableSpannerConfig,
                    ddlView,
                    shadowTableDdlView,
                    options.getShadowTablePrefix(),
                    options.getDatastreamSourceType(),
                    isRegularMode));
    /*
     * Stage 5: Write failures to GCS Dead Letter Queue
     * a) Retryable errors are written to retry GCS Dead letter queue
     * b) Severe errors are written to severe GCS Dead letter queue
     */
    // We will write only the original payload from the failsafe event to the DLQ.  We are doing
    // that in
    // StringDeadLetterQueueSanitizer.
    spannerWriteResults
        .retryableErrors()
        .apply(
            "DLQ: Write retryable Failures to GCS",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            "Write To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime())
                .withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/")
                .setIncludePaneInfo(true)
                .build());
    PCollection<FailsafeElement<String, String>> dlqErrorRecords =
        reconsumedElements
            .get(DeadLetterQueueManager.PERMANENT_ERRORS)
            .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
    // TODO: Write errors from transformer and spanner writer into separate folders
    PCollection<FailsafeElement<String, String>> permanentErrors =
        PCollectionList.of(dlqErrorRecords)
            .and(spannerWriteResults.permanentErrors())
            .and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))
            .apply(Flatten.pCollections());
    // increment the metrics
    permanentErrors
        .apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode)))
        .apply(
            "DLQ: Write Severe errors to GCS",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            "Write To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/")
                .setIncludePaneInfo(true)
                .build());
    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  static SpannerConfig getShadowTableSpannerConfig(Options options) {
    // Validate shadow table Spanner config - both instance and database must be specified together
    String shadowTableSpannerInstanceId = options.getShadowTableSpannerInstanceId();
    String shadowTableSpannerDatabaseId = options.getShadowTableSpannerDatabaseId();
    LOG.info(
        "Input Shadow table db -  instance {} and database {}",
        shadowTableSpannerInstanceId,
        shadowTableSpannerDatabaseId);

    if ((Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
            && !Strings.isNullOrEmpty(shadowTableSpannerDatabaseId))
        || (!Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
            && Strings.isNullOrEmpty(shadowTableSpannerDatabaseId))) {
      throw new IllegalArgumentException(
          "Both shadowTableSpannerInstanceId and shadowTableSpannerDatabaseId must be specified together");
    }
    // If not specified, use main instance and main database values. The shadow table database
    // stores the shadow tables and by default, is the same as the main database for backward
    // compatibility.
    if (Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
        && Strings.isNullOrEmpty(shadowTableSpannerDatabaseId)) {
      shadowTableSpannerInstanceId = options.getInstanceId();
      shadowTableSpannerDatabaseId = options.getDatabaseId();
      LOG.info(
          "Overwrote shadow table instance - {} and db- {}",
          shadowTableSpannerInstanceId,
          shadowTableSpannerDatabaseId);
    }
    return SpannerConfig.create()
        .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
        .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
        .withInstanceId(ValueProvider.StaticValueProvider.of(shadowTableSpannerInstanceId))
        .withDatabaseId(ValueProvider.StaticValueProvider.of(shadowTableSpannerDatabaseId))
        .withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()))
        .withCommitRetrySettings(
            RetrySettings.newBuilder()
                .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(4))
                .setInitialRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
                .setRetryDelayMultiplier(1)
                .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
                .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
                .setRpcTimeoutMultiplier(1)
                .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
                .setMaxAttempts(1)
                .build());
  }

  private static DeadLetterQueueManager buildDlqManager(Options options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDeadLetterQueueDirectory().isEmpty()
            ? tempLocation + "dlq/"
            : options.getDeadLetterQueueDirectory();
    LOG.info("Dead-letter queue directory: {}", dlqDirectory);
    options.setDeadLetterQueueDirectory(dlqDirectory);
    if ("regular".equals(options.getRunMode())) {
      return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount());
    } else {
      String retryDlqUri =
          FileSystems.matchNewResource(dlqDirectory, true)
              .resolve("severe", StandardResolveOptions.RESOLVE_DIRECTORY)
              .toString();
      LOG.info("Dead-letter retry directory: {}", retryDlqUri);
      return DeadLetterQueueManager.create(dlqDirectory, retryDlqUri, 0);
    }
  }

  static ISchemaOverridesParser configureSchemaOverrides(Options options) {
    // incorrect configuration
    if (!options.getSchemaOverridesFilePath().isEmpty()
        && (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty())) {
      throw new IllegalArgumentException(
          "Only one of file based or string based overrides must be configured! Please correct the configuration and re-run the job");
    }
    // string based overrides
    if (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty()) {
      Map<String, String> userOptionsOverrides = new HashMap<>();
      if (!options.getTableOverrides().isEmpty()) {
        userOptionsOverrides.put("tableOverrides", options.getTableOverrides());
      }
      if (!options.getColumnOverrides().isEmpty()) {
        userOptionsOverrides.put("columnOverrides", options.getColumnOverrides());
      }
      return new SchemaStringOverridesParser(userOptionsOverrides);
    }
    // file based overrides
    if (!options.getSchemaOverridesFilePath().isEmpty()) {
      return new SchemaFileOverridesParser(options.getSchemaOverridesFilePath());
    }
    // no overrides
    return new NoopSchemaOverridesParser();
  }
}

다음 단계