Spanner 변경 내역에서 BigQuery로 템플릿

Spanner change stream to BigQuery 템플릿은 Spanner 데이터 변경 레코드를 스트리밍하고 Dataflow Runner V2를 사용하여 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다.

Spanner 트랜잭션의 수정 여부와 관계없이 변경 내역이 감시하는 모든 열이 각 BigQuery 테이블 행에 포함됩니다. 감시 대상이 아닌 열은 BigQuery 행에 포함되지 않습니다. Dataflow 워터마크보다 낮은 Spanner 변경사항은 BigQuery 테이블에 적용되거나 재시도를 위해 데드 레터 큐에 저장됩니다. BigQuery 행이 저장되는 순서는 원본 Spanner 커밋 타임스탬프의 순서와 다릅니다.

필요한 BigQuery 테이블이 없으면 파이프라인이 테이블을 만듭니다. 그렇지 않으면 기존 BigQuery 테이블이 사용됩니다. 기존 BigQuery 테이블의 스키마에는 Spanner 테이블의 해당 추적 열과 추가 메타데이터 열이 ignoreFields 옵션으로 인해 명시적으로 무시되지 않아야 합니다. 다음 목록에서 메타데이터 필드에 대한 설명을 참조하세요. 각각의 새 BigQuery 행에는 변경 레코드의 타임스탬프에 있는 Spanner 테이블의 해당 행에서 변경 내역이 감시하는 모든 열이 포함됩니다.

다음 메타데이터 필드가 BigQuery 테이블에 추가됩니다. 이러한 필드에 대한 자세한 내용은 '변경 내역 파티션, 레코드, 쿼리'의 데이터 변경 레코드를 참조하세요.

  • _metadata_spanner_mod_type: Spanner 트랜잭션의 수정 유형(삽입, 업데이트 또는 삭제)입니다. 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_table_name: Spanner 테이블 이름입니다. 이 필드는 커넥터의 메타데이터 테이블 이름이 아닙니다.
  • _metadata_spanner_commit_timestamp: Spanner 커밋 타임스탬프로, 변경사항이 커밋되는 시간입니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_server_transaction_id: 변경사항이 커밋된 Spanner 트랜잭션을 나타내는 전역적으로 고유한 문자열입니다. 변경 내역 레코드를 처리할 때에만 이 값을 사용합니다. Spanner API의 트랜잭션 ID와는 상관 관계가 없습니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_record_sequence: Spanner 트랜잭션 내 레코드의 시퀀스 넘버입니다. 시퀀스 번호는 트랜잭션 내에서 고유하고 단조롭게(단, 반드시 연속적일 필요는 없음) 증가하도록 보장됩니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: 레코드가 현재 파티션의 마지막 Spanner 트랜잭션 레코드인지 여부를 나타냅니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_number_of_records_in_transaction: 모든 변경 내역 파티션에서 Spanner 트랜잭션에 포함된 데이터 변경 레코드 수입니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_number_of_partitions_in_transaction: Spanner 트랜잭션의 데이터 변경 레코드를 반환하는 파티션 수입니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_big_query_commit_timestamp: BigQuery에 행이 삽입된 시점의 커밋 타임스탬프입니다. useStorageWriteApitrue이면 이 열은 파이프라인에 의해 변경 로그 테이블에 자동으로 생성되지 않습니다. 이 경우 필요한 경우 변경 로그 테이블에 이 열을 수동으로 추가해야 합니다.

이 템플릿을 사용할 때는 다음 세부정보에 유의하세요.

  • 이 템플릿을 사용하여 Spanner에서 BigQuery로 기존 테이블 또는 새 테이블의 새 열을 전파할 수 있습니다. 자세한 내용은 추적 테이블 또는 열 추가 처리를 참고하세요.
  • OLD_AND_NEW_VALUESNEW_VALUES 값 캡처 유형의 경우 데이터 변경 레코드에 UPDATE 변경사항이 있으면 템플릿은 변경되지 않았지만 감시된 열을 검색하기 위해 데이터 변경 레코드의 커밋 타임스탬프에서 Spanner에 대한 비활성 읽기를 수행해야 합니다. 비활성 읽기에 대해 데이터베이스 'version_retention_period'를 올바르게 구성해야 합니다. NEW_ROW 값 캡처 유형의 경우 데이터 변경 레코드가 UPDATE 요청에서 업데이트되지 않는 열을 포함하여 전체 새 행을 캡처하기 때문에 더 효율적이므로 템플릿이 비활성 읽기를 수행할 필요가 없습니다.
  • 네트워크 지연 시간 및 네트워크 전송 비용을 최소화하려면 Spanner 인스턴스 또는 BigQuery 테이블과 동일한 리전에서 Dataflow 작업을 실행합니다. 작업 리전 외부에 있는 소스, 싱크, 스테이징 파일 위치 또는 임시 파일 위치를 사용하면 데이터가 리전 간에서 전송될 수 있습니다. 자세한 내용은 Dataflow 리전을 참조하세요.
  • 이 템플릿은 모든 유효한 Spanner 데이터 유형을 지원하지만, BigQuery 유형이 Spanner 유형보다 더 정확한 경우 변환 중에 정밀도 손실이 발생할 수 있습니다. 구체적으로 설명하면 다음과 같습니다.
    • Spanner JSON 유형의 경우 객체 멤버의 순서는 사전순으로 정렬되지만 BigQuery JSON 유형은 보장되지 않습니다.
    • Spanner는 나노초 TIMESTAMP 유형만 지원하지만, BigQuery는 마이크로초 TIMESTAMP 유형만 지원합니다.

변경 스트림, 변경 스트림 Dataflow 파이프라인 빌드 방법, 권장사항에 대해 자세히 알아보세요.

파이프라인 요구사항

  • 파이프라인을 실행하기 전에 Spanner 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 메타데이터 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 메타데이터 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 변경 내역이 있어야 합니다.
  • 파이프라인을 실행하기 전에 BigQuery 데이터 세트가 있어야 합니다.

추적 테이블 또는 열 추가 처리

이 섹션에서는 파이프라인이 실행되는 동안 추적 Spanner 테이블 및 열 추가를 처리하기 위한 권장사항을 설명합니다. 이 기능에 지원되는 가장 오래된 템플릿 버전은 2024-09-19-00_RC00입니다.

  • Spanner 변경 내역 범위에 새 열을 추가하기 전에 먼저 BigQuery 변경 로그 테이블에 열을 추가합니다. 추가된 열은 일치하는 데이터 유형을 보유하고 NULLABLE이어야 합니다. Spanner에서 새 열 또는 테이블을 계속 만들기 전에 10분 이상 기다립니다. 기다리지 않고 새 열에 쓰면 데드 레터 큐 디렉터리에 잘못된 오류 코드가 있는 처리되지 않은 레코드가 생성될 수 있습니다.
  • 새 테이블을 추가하려면 먼저 Spanner 데이터베이스에 테이블을 추가합니다. 파이프라인이 새 테이블의 레코드를 수신하면 BigQuery에 테이블이 자동으로 생성됩니다.
  • Spanner 데이터베이스에 새 열 또는 테이블을 추가한 후에는 새 열 또는 테이블이 아직 암시적으로 추적되지 않는 경우 원하는 새 열 또는 테이블을 추적하도록 변경 내역을 변경해야 합니다.
  • 이 템플릿은 BigQuery에서 테이블이나 열을 삭제하지 않습니다. 열이 Spanner 테이블에서 삭제되면 BigQuery에서 열을 수동으로 삭제하지 않는 한 Spanner 테이블에서 열이 삭제된 후에 생성된 레코드의 BigQuery 변경 로그 열에 null 값이 채워집니다.
  • 템플릿은 열 유형 업데이트를 지원하지 않습니다. Spanner는 STRING 열을 BYTES 열로 또는 BYTES 열을 STRING 열로 변경하는 것을 지원하지만 BigQuery에서는 기존 열의 데이터 유형을 수정하거나 동일한 열 이름을 다른 데이터 유형에 사용할 수 없습니다. Spanner에서 이름은 같지만 유형이 다른 열을 삭제하고 다시 만들면 데이터가 기존 BigQuery 열에 쓰일 수 있지만 유형은 변경되지 않습니다.
  • 이 템플릿은 열 모드 업데이트를 지원하지 않습니다. BigQuery에 복제된 메타데이터 열은 REQUIRED 모드로 설정됩니다. BigQuery로 복제된 다른 모든 열은 Spanner 테이블에서 NOT NULL로 정의되었는지와 관계없이 NULLABLE로 설정됩니다. BigQuery에서는 NULLABLE 열을 REQUIRED 모드로 업데이트할 수 없습니다.
  • 실행 중인 파이프라인에서는 변경 스트림의 값 캡처 유형 변경이 지원되지 않습니다.

템플릿 매개변수

필수 매개변수

  • spannerInstanceId: 변경 내역을 읽어올 Spanner 인스턴스입니다.
  • spannerDatabase: 변경 내역을 읽어올 Spanner 데이터베이스입니다.
  • spannerMetadataInstanceId: 변경 내역 커넥터 메타데이터 테이블에 사용할 Spanner 인스턴스입니다.
  • spannerMetadataDatabase: 변경 내역 커넥터 메타데이터 테이블에 사용할 Spanner 데이터베이스입니다.
  • spannerChangeStreamName: 읽어올 Spanner 변경 내역의 이름입니다.
  • bigQueryDataset: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.

선택적 매개변수

  • spannerProjectId: 변경 내역을 읽어 올 프로젝트입니다. 또한 이 값은 변경 내역 커넥터 메타데이터 테이블이 생성되는 프로젝트이기도 합니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
  • spannerDatabaseRole: 템플릿을 실행할 때 사용할 Spanner 데이터베이스 역할입니다. 이 매개변수는 템플릿을 실행하는 IAM 주 구성원이 세분화된 액세스 제어 사용자인 경우에만 필요합니다. 데이터베이스 역할에는 변경 내역에 대한 SELECT 권한과 변경 내역의 읽기 함수에 대한 EXECUTE 권한이 있어야 합니다. 자세한 내용은 변경 내역에 대한 세분화된 액세스 제어(https://cloud.google.com/spanner/docs/fgac-change-streams)를 참조하세요.
  • spannerMetadataTableName: 사용할 Spanner 변경 내역 커넥터 메타데이터 테이블 이름입니다. 제공하지 않으면 파이프라인 흐름 중에 Spanner 변경 내역 커넥터 메타데이터 테이블이 자동으로 생성됩니다. 기존 파이프라인을 업데이트할 때 이 매개변수를 제공해야 합니다. 그렇지 않으면 이 매개변수를 제공하지 마세요.
  • rpcPriority: Spanner 호출의 요청 우선순위입니다. 값은 HIGH, MEDIUM, LOW 값 중 하나여야 합니다. 기본값은 HIGH입니다.
  • spannerHost: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 테스트에만 사용됩니다. 예를 들면 https://batch-spanner.googleapis.com입니다.
  • startTimestamp: 변경 내역을 읽는 데 사용할 시작 날짜/시간(https://datatracker.ietf.org/doc/html/rfc3339)입니다. Ex-2021-10-12T07:20:50.52Z. 기본값은 파이프라인이 시작되는 시점의 타임스탬프, 즉 현재 시간입니다.
  • endTimestamp: 변경 내역 Ex-2021-10-12T07:20:50.52Z를 읽는 데 사용할 종료 날짜/시간(https://datatracker.ietf.org/doc/html/rfc3339)입니다. 기본값은 미래의 무한대 시간입니다.
  • bigQueryProjectId: BigQuery 프로젝트입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
  • bigQueryChangelogTableNameTemplate: 변경 로그를 포함하는 BigQuery 테이블 이름의 템플릿입니다. 기본값은 {_metadata_spanner_table_name}_changelog입니다.
  • deadLetterQueueDirectory: 처리되지 않은 레코드를 저장하는 경로입니다. 기본 경로는 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 보통 기본값으로 충분합니다.
  • dlqRetryMinutes: 데드 레터 큐 재시도 간격(분)입니다. 기본값은 10입니다.
  • ignoreFields: 무시할 쉼표로 구분된 필드 목록(대소문자 구분)입니다. 이러한 필드는 감시 테이블 필드이거나 파이프라인에서 추가한 메타데이터 필드일 수 있습니다. 무시된 필드는 BigQuery에 삽입되지 않습니다. _metadata_spanner_table_name 필드를 무시하면 bigQueryChangelogTableNameTemplate 파라미터도 무시됩니다. 기본값은 빈 값입니다.
  • disableDlqRetries: DLQ 재시도를 사용 중지할지 여부입니다. 기본값은 false입니다.
  • useStorageWriteApi: true이면 파이프라인에서 BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • useStorageWriteApiAtLeastOnce: Storage Write API를 사용할 경우 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)를 사용하려면, 이 매개변수를 true로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.
  • numStorageWriteApiStreams: Storage Write API를 사용할 경우 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 경우 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Spanner change streams to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID입니다.
  • SPANNER_DATABASE: Spanner 데이터베이스입니다.
  • SPANNER_METADATA_INSTANCE_ID: Spanner 메타데이터 인스턴스 ID입니다.
  • SPANNER_METADATA_DATABASE: Spanner 메타데이터 데이터베이스입니다.
  • SPANNER_CHANGE_STREAM: Spanner 변경 내역입니다.
  • BIGQUERY_DATASET: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID입니다.
  • SPANNER_DATABASE: Spanner 데이터베이스입니다.
  • SPANNER_METADATA_INSTANCE_ID: Spanner 메타데이터 인스턴스 ID입니다.
  • SPANNER_METADATA_DATABASE: Spanner 메타데이터 데이터베이스입니다.
  • SPANNER_CHANGE_STREAM: Spanner 변경 내역입니다.
  • BIGQUERY_DATASET: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.Timestamp;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.ModColumnType;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.OptionsUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO(haikuo-google): Add integration test.
// TODO(haikuo-google): Add README.
// TODO(haikuo-google): Add stackdriver metrics.
// TODO(haikuo-google): Ideally side input should be used to store schema information and shared
// accross DoFns, but since side input fix is not yet deployed at the moment, we read schema
// information in the beginning of the DoFn as a work around. We should use side input instead when
// it's available.
// TODO(haikuo-google): Test the case where tables or columns are added while the pipeline is
// running.
/**
 * This pipeline ingests {@link DataChangeRecord} from Spanner change stream. The {@link
 * DataChangeRecord} is then broken into {@link Mod}, which converted into {@link TableRow} and
 * inserted into BigQuery table.
 */
@Template(
    name = "Spanner_Change_Streams_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to BigQuery",
    description = {
      "The Cloud Spanner change streams to BigQuery template is a streaming pipeline that streams"
          + " Cloud Spanner data change records and writes them into BigQuery tables using Dataflow"
          + " Runner V2.\n",
      "All change stream watched columns are included in each BigQuery table row, regardless of"
          + " whether they are modified by a Cloud Spanner transaction. Columns not watched are not"
          + " included in the BigQuery row. Any Cloud Spanner change less than the Dataflow"
          + " watermark are either successfully applied to the BigQuery tables or are stored in the"
          + " dead-letter queue for retry. BigQuery rows are inserted out of order compared to the"
          + " original Cloud Spanner commit timestamp ordering.\n",
      "If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise, existing"
          + " BigQuery tables are used. The schema of existing BigQuery tables must contain the"
          + " corresponding tracked columns of the Cloud Spanner tables and any additional metadata"
          + " columns that are not ignored explicitly by the ignoreFields option. See the"
          + " description of the metadata fields in the following list. Each new BigQuery row"
          + " includes all columns watched by the change stream from its corresponding row in your"
          + " Cloud Spanner table at the change record's timestamp.\n",
      "The following metadata fields are added to BigQuery tables. For more details about these"
          + " fields, see Data change records in \"Change streams partitions, records, and"
          + " queries.\"\n"
          + "- _metadata_spanner_mod_type: The modification type (insert, update, or delete) of the"
          + " Cloud Spanner transaction. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_table_name: The Cloud Spanner table name. Note this field is not"
          + " the metadata table name of the connector.\n"
          + "- _metadata_spanner_commit_timestamp: The Spanner commit timestamp, which is the time"
          + " when a change is committed. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_server_transaction_id: A globally unique string that represents"
          + " the Spanner transaction in which the change was committed. Only use this value in the"
          + " context of processing change stream records. It isn't correlated with the transaction"
          + " ID in Spanner's API. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_record_sequence: The sequence number for the record within the"
          + " Spanner transaction. Sequence numbers are guaranteed to be unique and monotonically"
          + " increasing (but not necessarily contiguous) within a transaction. Extracted from"
          + " change stream data change record.\n"
          + "- _metadata_spanner_is_last_record_in_transaction_in_partition: Indicates whether the"
          + " record is the last record for a Spanner transaction in the current partition."
          + " Extracted from change stream data change record.\n"
          + "- _metadata_spanner_number_of_records_in_transaction: The number of data change"
          + " records that are part of the Spanner transaction across all change stream partitions."
          + " Extracted from change stream data change record.\n"
          + "- _metadata_spanner_number_of_partitions_in_transaction: The number of partitions that"
          + " return data change records for the Spanner transaction. Extracted from change stream"
          + " data change record.\n"
          + "- _metadata_big_query_commit_timestamp: The commit timestamp of when the row is"
          + " inserted into BigQuery.\n",
      "Notes:\n"
          + "- This template does not propagate schema changes from Cloud Spanner to BigQuery."
          + " Because performing a schema change in Cloud Spanner is likely going to break the"
          + " pipeline, you might need to recreate the pipeline after the schema change.\n"
          + "- For OLD_AND_NEW_VALUES and NEW_VALUES value capture types, when the data change"
          + " record contains an UPDATE change, the template needs to do a stale read to Cloud"
          + " Spanner at the commit timestamp of the data change record to retrieve the unchanged"
          + " but watched columns. Make sure to configure your database 'version_retention_period'"
          + " properly for the stale read. For the NEW_ROW value capture type, the template is more"
          + " efficient, because the data change record captures the full new row including columns"
          + " that are not updated in UPDATEs, and the template does not need to do a stale read.\n"
          + "- You can minimize network latency and network transport costs by running the Dataflow"
          + " job from the same region as your Cloud Spanner instance or BigQuery tables. If you"
          + " use sources, sinks, staging file locations, or temporary file locations that are"
          + " located outside of your job's region, your data might be sent across regions. See"
          + " more about Dataflow regional endpoints.\n"
          + "- This template supports all valid Cloud Spanner data types, but if the BigQuery type"
          + " is more precise than the Cloud Spanner type, precision loss might occur during the"
          + " transformation. Specifically:\n"
          + "  - For Cloud Spanner JSON type, the order of the members of an object is"
          + " lexicographically ordered, but there is no such guarantee for BigQuery JSON type.\n"
          + "  - Cloud Spanner supports nanoseconds TIMESTAMP type, BigQuery only supports"
          + " microseconds TIMESTAMP type.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change"
          + " streams</a>, <a"
          + " href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to"
          + " build change streams Dataflow pipelines</a>, and <a"
          + " href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best"
          + " practices</a>."
    },
    optionsClass = SpannerChangeStreamsToBigQueryOptions.class,
    flexContainerName = "spanner-changestreams-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The BigQuery dataset must exist prior to running the pipeline."
    },
    streaming = true,
    supportsExactlyOnce = true,
    supportsAtLeastOnce = true)
public final class SpannerChangeStreamsToBigQuery {

  /** String/String Coder for {@link FailsafeElement}. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToBigQuery.class);

  // Max number of deadletter queue retries.
  private static final int DLQ_MAX_RETRIES = 5;

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting to replicate change records from Spanner change streams to BigQuery");

    SpannerChangeStreamsToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerChangeStreamsToBigQueryOptions.class);

    run(options);
  }

  private static void validateOptions(SpannerChangeStreamsToBigQueryOptions options) {
    if (options.getDlqRetryMinutes() <= 0) {
      throw new IllegalArgumentException("dlqRetryMinutes must be positive.");
    }
    if (options
        .getBigQueryChangelogTableNameTemplate()
        .equals(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME)) {
      throw new IllegalArgumentException(
          String.format(
              "bigQueryChangelogTableNameTemplate cannot be set to '{%s}'. This value is reserved"
                  + " for the Cloud Spanner table name.",
              BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME));
    }

    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
  }

  private static void setOptions(SpannerChangeStreamsToBigQueryOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) {
    setOptions(options);
    validateOptions(options);

    /**
     * Stages: 1) Read {@link DataChangeRecord} from change stream. 2) Create {@link
     * FailsafeElement} of {@link Mod} JSON and merge from: - {@link DataChangeRecord}. - GCS Dead
     * letter queue. 3) Convert {@link Mod} JSON into {@link TableRow} by reading from Spanner at
     * commit timestamp. 4) Append {@link TableRow} to BigQuery. 5) Write Failures from 2), 3) and
     * 4) to GCS dead letter queue.
     */
    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);
    String spannerProjectId = OptionsUtils.getSpannerProjectId(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    /**
     * There are two types of errors that can occur in this pipeline:
     *
     * <p>1) Error originating from modJsonStringToTableRow. Errors here are either due to pk values
     * missing, a spanner table / column missing in the in-memory map, or some Spanner read error
     * happening in readSpannerRow. We already retry the Spanner read error inline 3 times. Th other
     * types of errors are more likely to be un-retriable.
     *
     * <p>2) Error originating from BigQueryIO.write. BigQuery storage write API already retries all
     * transient errors and outputs more permanent errors.
     *
     * <p>As a result, it is reasonable to write all errors happening in the pipeline directly into
     * the permanent DLQ, since most of the errors are likely to be non-transient.
     */
    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    // Retrieve and parse the startTimestamp and endTimestamp.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(spannerProjectId)
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabase())
            .withRpcPriority(options.getRpcPriority());
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }

    SpannerIO.ReadChangeStream readChangeStream =
        SpannerIO.readChangeStream()
            .withSpannerConfig(spannerConfig)
            .withMetadataInstance(options.getSpannerMetadataInstanceId())
            .withMetadataDatabase(options.getSpannerMetadataDatabase())
            .withChangeStreamName(options.getSpannerChangeStreamName())
            .withInclusiveStartAt(startTimestamp)
            .withInclusiveEndAt(endTimestamp)
            .withRpcPriority(options.getRpcPriority());

    String spannerMetadataTableName = options.getSpannerMetadataTableName();
    if (spannerMetadataTableName != null) {
      readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName);
    }

    PCollection<DataChangeRecord> dataChangeRecord =
        pipeline
            .apply("Read from Spanner Change Streams", readChangeStream)
            .apply("Reshuffle DataChangeRecord", Reshuffle.viaRandomKey());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply("DataChangeRecord To Mod JSON", ParDo.of(new DataChangeRecordToModJsonFn()))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson =
        dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    ImmutableSet.Builder<String> ignoreFieldsBuilder = ImmutableSet.builder();
    for (String ignoreField : options.getIgnoreFields().split(",")) {
      ignoreFieldsBuilder.add(ignoreField);
    }
    ImmutableSet<String> ignoreFields = ignoreFieldsBuilder.build();
    FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions
        failsafeModJsonToTableRowOptions =
            FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions.builder()
                .setSpannerConfig(spannerConfig)
                .setSpannerChangeStream(options.getSpannerChangeStreamName())
                .setIgnoreFields(ignoreFields)
                .setCoder(FAILSAFE_ELEMENT_CODER)
                .setUseStorageWriteApi(options.getUseStorageWriteApi())
                .build();
    FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow failsafeModJsonToTableRow =
        new FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow(
            failsafeModJsonToTableRowOptions);

    PCollectionTuple tableRowTuple =
        failsafeModJson.apply("Mod JSON To TableRow", failsafeModJsonToTableRow);
    // If users pass in the full BigQuery dataset ID (projectId.datasetName), extract the dataset
    // name for the setBigQueryDataset parameter.
    List<String> results = OptionsUtils.processBigQueryProjectAndDataset(options);
    String bigqueryProject = results.get(0);
    String bigqueryDataset = results.get(1);

    BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions
        bigQueryDynamicDestinationsOptions =
            BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions.builder()
                .setSpannerConfig(spannerConfig)
                .setChangeStreamName(options.getSpannerChangeStreamName())
                .setIgnoreFields(ignoreFields)
                .setBigQueryProject(bigqueryProject)
                .setBigQueryDataset(bigqueryDataset)
                .setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate())
                .setUseStorageWriteApi(options.getUseStorageWriteApi())
                .build();
    WriteResult writeResult;
    if (!options.getUseStorageWriteApi()) {
      writeResult =
          tableRowTuple
              .get(failsafeModJsonToTableRow.transformOut)
              .apply(
                  "Write To BigQuery",
                  BigQueryIO.<TableRow>write()
                      .to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
                      .withFormatFunction(element -> removeIntermediateMetadataFields(element))
                      .withFormatRecordOnFailureFunction(element -> element)
                      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    } else {
      writeResult =
          tableRowTuple
              .get(failsafeModJsonToTableRow.transformOut)
              .apply(
                  "Write To BigQuery",
                  BigQueryIO.<TableRow>write()
                      .to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
                      .withFormatFunction(element -> removeIntermediateMetadataFields(element))
                      .withFormatRecordOnFailureFunction(element -> element)
                      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
                      .ignoreUnknownValues()
                      .withAutoSchemaUpdate(true) // only supported when using STORAGE_WRITE_API or
                      // STORAGE_API_AT_LEAST_ONCE.
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    }

    PCollection<String> transformDlqJson =
        tableRowTuple
            .get(failsafeModJsonToTableRow.transformDeadLetterOut)
            .apply(
                "Failed Mod JSON During Table Row Transformation",
                MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollection<String> bqWriteDlqJson =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "Failed Mod JSON During BigQuery Writes",
                MapElements.via(new BigQueryDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        // Generally BigQueryIO storage write retries transient errors, and only more
        // persistent errors make it into DLQ.
        .and(bqWriteDlqJson)
        .apply("Merge Failed Mod JSON From Transform And BigQuery", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);

    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retryable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static DeadLetterQueueManager buildDlqManager(
      SpannerChangeStreamsToBigQueryOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDeadLetterQueueDirectory().isEmpty()
            ? tempLocation + "dlq/"
            : options.getDeadLetterQueueDirectory();

    LOG.info("Dead letter queue directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, DLQ_MAX_RETRIES);
  }

  /**
   * Remove the following intermediate metadata fields that are not user data from {@link TableRow}:
   * _metadata_error, _metadata_retry_count, _metadata_spanner_original_payload_json.
   */
  private static TableRow removeIntermediateMetadataFields(TableRow tableRow) {
    TableRow cleanTableRow = tableRow.clone();
    Set<String> rowKeys = tableRow.keySet();
    Set<String> metadataFields = BigQueryUtils.getBigQueryIntermediateMetadataFieldNames();

    for (String rowKey : rowKeys) {
      if (metadataFields.contains(rowKey)) {
        cleanTableRow.remove(rowKey);
      } else if (rowKeys.contains("_type_" + rowKey)) {
        cleanTableRow.remove("_type_" + rowKey);
      }
    }

    return cleanTableRow;
  }

  /**
   * DoFn that converts a {@link DataChangeRecord} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class DataChangeRecordToModJsonFn extends DoFn<DataChangeRecord, String> {

    @ProcessElement
    public void process(@Element DataChangeRecord input, OutputReceiver<String> receiver) {
      for (org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod changeStreamsMod :
          input.getMods()) {
        Mod mod =
            new Mod(
                changeStreamsMod.getKeysJson(),
                changeStreamsMod.getNewValuesJson(),
                input.getCommitTimestamp(),
                input.getServerTransactionId(),
                input.isLastRecordInTransactionInPartition(),
                input.getRecordSequence(),
                input.getTableName(),
                input.getRowType().stream().map(ModColumnType::new).collect(Collectors.toList()),
                input.getModType(),
                input.getValueCaptureType(),
                input.getNumberOfRecordsInTransaction(),
                input.getNumberOfPartitionsInTransaction());

        String modJsonString;

        try {
          modJsonString = mod.toJson();
        } catch (IOException e) {
          // Ignore exception and print bad format.
          modJsonString = String.format("\"%s\"", input);
        }
        receiver.output(modJsonString);
      }
    }
  }
}

다음 단계