Bigtable change streams to Pub/Sub 템플릿

Bigtable change streams to Pub/Sub 템플릿은 Bigtable 데이터 변경 레코드를 스트리밍하고 Dataflow를 사용하여 Pub/Sub 주제에 게시하는 스트리밍 파이프라인입니다.

Bigtable 변경 내역을 사용하면 테이블별로 데이터 변형을 구독할 수 있습니다. 테이블 변경 내역을 구독하면 다음 제약조건이 적용됩니다.

  • 수정된 셀 및 삭제 작업의 설명어만 반환됨
  • 수정된 셀의 새 값만 반환됩니다.

데이터 변경 레코드가 Pub/Sub 주제에 게시되면 메시지가 원본 Bigtable 커밋 타임스탬프 순서 지정과 달리 비순차적으로 삽입될 수 있습니다.

Pub/Sub 주제에 게시할 수 없는 Bigtable 데이터 변경 레코드는 일시적으로 Cloud Storage의 데드 레터 큐(처리되지 않은 메시지 큐) 디렉터리에 저장됩니다. 최대 재시도 실패 횟수를 초과하면 이러한 레코드는 사람이 검토하거나 사용자가 추가로 처리할 수 있도록 동일한 데드 레터 큐 디렉터리에 무기한 저장됩니다.

파이프라인을 사용하려면 대상 Pub/Sub 주제가 있어야 합니다. 대상 주제는 스키마를 사용하여 메시지를 검증하도록 구성할 수 있습니다. Pub/Sub 주제가 스키마를 지정할 때는 스키마가 유효한 경우에만 파이프라인이 시작됩니다. 스키마 유형에 따라 대상 주제에 다음 스키마 정의 중 하나를 사용합니다.

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  
{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON 메시지 인코딩과 함께 다음 Protobuf 스키마를 사용합니다.

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

각각의 새 Pub/Sub 메시지에는 Bigtable 테이블의 해당 행에서 변경 내역이 반환한 데이터 변경 레코드의 항목 하나가 포함됩니다. Pub/Sub 템플릿은 각 데이터 변경 레코드의 항목을 개별 셀 수준 변경사항으로 평탄화합니다.

Pub/Sub 출력 메시지 설명

필드 이름 설명
rowKey 변경된 행의 row key입니다. 바이트 배열 형식으로 도착합니다. JSON 메시지 인코딩이 구성된 경우 row key가 문자열로 반환됩니다. useBase64Rowkeys가 지정된 경우 row key가 Base64로 인코딩됩니다. 그렇지 않으면 bigtableChangeStreamCharset에서 지정한 문자 집합을 사용하여 row key 바이트를 문자열로 디코딩합니다.
modType 행 변형의 유형입니다. SET_CELL, DELETE_CELLS, DELETE_FAMILY 중 한 가지 값을 사용합니다.
columnFamily 행 변형의 영향을 받는 column family입니다.
column 행 변형의 영향을 받는 column qualifier입니다. DELETE_FAMILY 변형 유형의 경우 열 필드가 설정되지 않습니다. 바이트 배열 형식으로 도착합니다. JSON 메시지 인코딩이 구성된 경우 열이 문자열로 반환됩니다. useBase64ColumnQualifier가 지정된 경우 열 필드는 Base64로 인코딩됩니다. 그렇지 않으면 bigtableChangeStreamCharset에서 지정한 문자 집합을 사용하여 row key 바이트를 문자열로 디코딩합니다.
commitTimestamp Bigtable에서 변형을 적용하는 시간입니다. 유닉스 시간(1970년 1월 1일 UTC 기준) 이후의 시간이 마이크로초 단위로 측정됩니다.
timestamp 변형의 영향을 받는 셀의 타임스탬프 값입니다. DELETE_CELLSDELETE_FAMILY 변형 유형의 경우 타임스탬프가 설정되지 않습니다. 유닉스 시간(1970년 1월 1일 UTC 기준) 이후의 시간이 마이크로초 단위로 측정됩니다.
timestampFrom DELETE_CELLS 변형으로 삭제된 모든 셀의 타임스탬프 간격의 시작(경계 포함)을 설명합니다. 다른 변형 유형의 경우 timestampFrom이 설정되지 않습니다. 유닉스 시간(1970년 1월 1일 UTC 기준) 이후의 시간이 마이크로초 단위로 측정됩니다.
timestampTo DELETE_CELLS 변형으로 삭제된 모든 셀의 타임스탬프 간격의 종료(경계 제외)를 설명합니다. 다른 변형 유형의 경우 timestampTo이 설정되지 않습니다.
isGC Bigtable 가비지 컬렉션 메커니즘으로 변형이 생성되는지 여부를 나타내는 불리언 값입니다.
tieBreaker 서로 다른 Bigtable 클러스터에서 두 가지 변형을 동시에 등록하면 tiebreaker 값이 가장 높은 변형이 소스 테이블에 적용됩니다. tiebreaker 값이 낮은 변형은 삭제됩니다.
value 변형으로 인해 설정된 새 값입니다. stripValues 파이프라인 옵션이 설정되지 않은 경우 이 값은 SET_CELL 변형에 설정됩니다. 다른 변형 유형의 경우 값이 설정되지 않습니다. 바이트 배열 형식으로 도착합니다. JSON 메시지 인코딩이 구성된 경우 값이 문자열로 반환됩니다. useBase64Values가 지정된 경우 값은 Base64로 인코딩됩니다. 그렇지 않으면 bigtableChangeStreamCharset에서 지정한 문자 집합을 사용하여 값 바이트를 문자열로 디코딩합니다.
sourceInstance 변형을 등록한 Bigtable 인스턴스의 이름입니다. 여러 파이프라인이 서로 다른 인스턴스의 변경사항을 동일한 Pub/Sub 주제로 스트리밍하는 경우일 수 있습니다.
sourceCluster 변형을 등록한 Bigtable 클러스터의 이름입니다. 여러 파이프라인이 서로 다른 인스턴스의 변경사항을 동일한 Pub/Sub 주제로 스트리밍하는 경우에 사용할 수 있습니다.
sourceTable 변형을 수신한 Bigtable 테이블의 이름입니다. 여러 파이프라인이 서로 다른 테이블의 변경사항을 동일한 Pub/Sub 주제로 스트리밍하는 경우에 사용할 수 있습니다.

파이프라인 요구사항

  • 지정된 Bigtable 소스 인스턴스입니다.
  • 지정된 Bigtable 소스 테이블입니다. 테이블에 변경 내역이 사용 설정되어 있어야 합니다.
  • 지정된 Bigtable 애플리케이션 프로필입니다.
  • 지정된 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • pubSubTopic: 대상 Pub/Sub 주제의 이름입니다.
  • bigtableChangeStreamAppProfile: Bigtable 애플리케이션 프로필 ID입니다. 애플리케이션 프로필에서 단일 클러스터 라우팅을 사용하고 단일 행 트랜잭션을 허용해야 합니다.
  • bigtableReadInstanceId: 소스 Bigtable 인스턴스 ID입니다.
  • bigtableReadTableId: 소스 Bigtable 테이블 ID입니다.

선택적 매개변수

  • messageEncoding: Pub/Sub 주제에 게시할 메시지 인코딩입니다. 대상 주제의 스키마가 구성되어 있으면 주제 설정에 따라 메시지 인코딩이 결정됩니다. 지원되는 값은 BINARYJSON입니다. 기본값은 JSON입니다.
  • messageFormat: Pub/Sub 주제에 게시할 메시지 인코딩입니다. 대상 주제의 스키마가 구성되어 있으면 주제 설정에 따라 메시지 인코딩이 결정됩니다. AVRO, PROTOCOL_BUFFERS, JSON 값이 지원됩니다. 기본값은 JSON입니다. JSON 형식을 사용하는 경우 메시지의 rowKey, 열, 값 필드는 문자열이며, 해당 콘텐츠는 파이프라인 옵션 useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values, bigtableChangeStreamCharset에 따라 결정됩니다.
  • stripValues: true로 설정하면 새 값이 설정되지 않은 SET_CELL 변형이 반환됩니다. 기본값은 false입니다. 이 매개변수는 새 값이 필요하지 않거나(캐시 무효화라고도 함) 값이 매우 크고 Pub/Sub 메시지 크기 한도를 초과하는 경우에 유용합니다.
  • dlqDirectory: 데드 레터 큐의 디렉터리입니다. 처리하지 못한 레코드가 이 디렉터리에 저장됩니다. 기본값은 Dataflow 작업 임시 위치 아래에 있는 디렉터리입니다. 대부분의 경우 기본 경로를 사용할 수 있습니다.
  • dlqRetryMinutes: 데드 레터 큐 재시도 간격(분)입니다. 기본값은 10입니다.
  • dlqMaxRetries: 데드 레터 최대 재시도입니다. 기본값은 5입니다.
  • useBase64Rowkeys: JSON 메시지 인코딩에 사용됩니다. true로 설정한 경우 rowKey 필드는 Base64로 인코딩된 문자열입니다. 그렇지 않은 경우 rowKeybigtableChangeStreamCharset를 사용하여 바이트를 문자열로 디코딩하도록 생성됩니다. 기본값은 false입니다.
  • pubSubProjectId: Bigtable 프로젝트 ID입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
  • useBase64ColumnQualifiers: JSON 메시지 인코딩과 함께 사용됩니다. true로 설정한 경우 column 필드는 Base64로 인코딩된 문자열입니다. 그렇지 않으면 bigtableChangeStreamCharset로 바이트를 문자열로 디코딩하여 열이 생성됩니다. 기본값은 false입니다.
  • useBase64Values: JSON 메시지 인코딩에 사용됩니다. true로 설정한 경우 값 필드는 Base64로 인코딩된 문자열입니다. 그렇지 않으면 bigtableChangeStreamCharset로 바이트를 문자열로 디코딩하여 값이 생성됩니다. 기본값은 false입니다.
  • disableDlqRetries: DLQ 재시도를 사용 중지할지 여부입니다. 기본값은 false입니다.
  • bigtableChangeStreamMetadataInstanceId: Bigtable 변경 내역 메타데이터 인스턴스 ID입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamMetadataTableTableId: Bigtable 변경 내역 커넥터 메타데이터 테이블의 ID입니다. 제공하지 않으면 파이프라인 실행 중에 Bigtable 변경 내역 커넥터 메타데이터 테이블이 자동으로 생성됩니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamCharset: Bigtable 변경 내역 문자 집합 이름입니다. 기본값은 UTF-8입니다.
  • bigtableChangeStreamStartTimestamp: 변경 내역을 읽는 데 사용할 시작 타임스탬프(https://tools.ietf.org/html/rfc3339)입니다(경계 포함). 예를 들면 2022-05-05T07:59:59Z입니다. 기본값은 파이프라인 시작 시간의 타임스탬프입니다.
  • bigtableChangeStreamIgnoreColumnFamilies: 무시할 column family 이름 변경 내역을 쉼표로 구분한 목록입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamIgnoreColumns: 무시할 열 이름 변경 내역을 쉼표로 구분한 목록입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamName: 클라이언트 파이프라인의 고유한 이름입니다. 이전에 실행 중이던 파이프라인이 중지된 지점에서 처리를 계속할 수 있습니다. 기본값은 자동으로 생성된 이름입니다. 사용된 값은 Dataflow 작업 로그를 참고하세요.
  • bigtableChangeStreamResume: true로 설정하면 bigtableChangeStreamName 값이 동일하고, 이전에 실행 중이던 파이프라인이 중지된 지점부터 새 파이프라인이 처리를 계속합니다. 지정된 bigtableChangeStreamName 값의 파이프라인이 실행된 적이 없는 경우 새 파이프라인이 시작되지 않습니다. false로 설정하면 새 파이프라인이 시작됩니다. bigtableChangeStreamName 값이 동일한 파이프라인이 지정된 소스에 대해 이미 실행된 경우 새 파이프라인이 시작되지 않습니다. 기본값은 false입니다.
  • bigtableReadProjectId: Bigtable 프로젝트 ID입니다. 기본값은 Dataflow 작업의 프로젝트입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bigtable change streams to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_INSTANCE_ID: Bigtable 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: Bigtable 테이블 ID입니다.
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable 애플리케이션 프로필 ID
  • PUBSUB_TOPIC: Pub/Sub 대상 주제 이름

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_INSTANCE_ID: Bigtable 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: Bigtable 테이블 ID입니다.
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable 애플리케이션 프로필 ID
  • PUBSUB_TOPIC: Pub/Sub 대상 주제 이름
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub;

import com.google.cloud.Timestamp;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation.MutationType;
import com.google.cloud.bigtable.data.v2.models.DeleteCells;
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.utils.UnsupportedEntryException;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.FailsafePublisher.PublishModJsonToTopic;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.BigtableSource;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.MessageEncoding;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.MessageFormat;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.PubSubDestination;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.TestChangeStreamMutation;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.schemautils.PubSubUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.GetTopicRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.ValidateMessageRequest;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests {@link ChangeStreamMutation} from Bigtable change stream. The {@link
 * ChangeStreamMutation} is then broken into {@link Mod}, which converted into PubsubMessage and
 * inserted into Pub/Sub topic.
 */
@Template(
    name = "Bigtable_Change_Streams_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Bigtable Change Streams to PubSub",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into PubSub using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamsToPubSubOptions.class,
    optionsOrder = {
      BigtableChangeStreamsToPubSubOptions.class,
      BigtableCommonOptions.ReadChangeStreamOptions.class,
      BigtableCommonOptions.ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-bigtable-change-streams-to-pubsub",
    flexContainerName = "bigtable-changestreams-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    streaming = true,
    supportsAtLeastOnce = true)
public final class BigtableChangeStreamsToPubSub {

  /** String/String Coder for {@link FailsafeElement}. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamsToPubSub.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    LOG.info("Starting to replicate change records from Cloud Bigtable change streams to PubSub");

    BigtableChangeStreamsToPubSubOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamsToPubSubOptions.class);

    run(options);
  }

  private static void validateOptions(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getDlqRetryMinutes() <= 0) {
      throw new IllegalArgumentException("dlqRetryMinutes must be positive.");
    }
    if (options.getDlqMaxRetries() < 0) {
      throw new IllegalArgumentException("dlqMaxRetries cannot be negative.");
    }
  }

  private static void setOptions(BigtableChangeStreamsToPubSubOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(BigtableChangeStreamsToPubSubOptions options) {
    setOptions(options);
    validateOptions(options);

    String bigtableProject = getBigtableProjectId(options);

    // Retrieve and parse the startTimestamp
    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    BigtableSource sourceInfo =
        new BigtableSource(
            options.getBigtableReadInstanceId(),
            options.getBigtableReadTableId(),
            getBigtableCharset(options),
            options.getBigtableChangeStreamIgnoreColumnFamilies(),
            options.getBigtableChangeStreamIgnoreColumns(),
            startTimestamp);

    Topic topic = null;
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      GetTopicRequest request =
          GetTopicRequest.newBuilder()
              .setTopic(
                  TopicName.ofProjectTopicName(
                          getPubSubProjectId(options), options.getPubSubTopic())
                      .toString())
              .build();
      topic = topicAdminClient.getTopic(request);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }

    try {
      if (!validateSchema(topic, options, sourceInfo)) {
        final String errorMessage = "Configured topic doesn't accept messages of configured format";
        throw new IllegalArgumentException(errorMessage);
      }

    } catch (Exception e) {
      throw new IllegalArgumentException(e);
    }

    PubSubDestination destinationInfo = newPubSubDestination(options, topic);
    PubSubUtils pubSub = new PubSubUtils(sourceInfo, destinationInfo);

    /*
     * Stages: 1) Read {@link ChangeStreamMutation} from change stream. 2) Create {@link
     * FailsafeElement} of {@link Mod} JSON and merge from: - {@link ChangeStreamMutation}. - GCS Dead
     * letter queue. 3) Convert {@link Mod} JSON into PubsubMessage and publish it to PubSub.
     * 4) Write Failures from 2) and 3) to GCS dead letter queue.
     */
    // Step 1
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProject)
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withStartTime(startTimestamp);

    if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
      readChangeStream =
          readChangeStream.withMetadataTableTableId(
              options.getBigtableChangeStreamMetadataTableTableId());
    }
    // Step 2: just return the output for sending to pubSub and DLQ
    PCollection<ChangeStreamMutation> dataChangeRecord =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply(Values.create());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply(
                "ChangeStreamMutation To Mod JSON",
                ParDo.of(new ChangeStreamMutationToModJsonFn(sourceInfo)))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));

    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson = null;
    if (options.getDisableDlqRetries()) {
      retryableDlqFailsafeModJson = pipeline.apply(Create.empty(FAILSAFE_ELEMENT_CODER));
    } else {
      retryableDlqFailsafeModJson =
          dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    }

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    FailsafePublisher.FailsafeModJsonToPubsubMessageOptions failsafeModJsonToPubsubOptions =
        FailsafePublisher.FailsafeModJsonToPubsubMessageOptions.builder()
            .setCoder(FAILSAFE_ELEMENT_CODER)
            .build();

    PublishModJsonToTopic publishModJsonToTopic =
        new PublishModJsonToTopic(pubSub, failsafeModJsonToPubsubOptions);

    PCollection<FailsafeElement<String, String>> failedToPublish =
        failsafeModJson.apply("Publish Mod JSON To Pubsub", publishModJsonToTopic);

    PCollection<String> transformDlqJson =
        failedToPublish.apply(
            "Failed Mod JSON During Table Row Transformation",
            MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        .apply("Merge Failed Mod JSON From Transform And PubSub", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    LOG.info(
        "DLQ manager severe DLQ directory with date time: {}",
        dlqManager.getSevereDlqDirectoryWithDateTime());
    LOG.info("DLQ manager severe DLQ directory: {}", dlqManager.getSevereDlqDirectory() + "tmp/");
    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retriable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static PubSubDestination newPubSubDestination(
      BigtableChangeStreamsToPubSubOptions options, Topic topic) {
    return new PubSubDestination(
        getPubSubProjectId(options),
        options.getPubSubTopic(),
        topic,
        options.getMessageFormat(),
        options.getMessageEncoding(),
        options.getUseBase64Rowkeys(),
        options.getUseBase64ColumnQualifiers(),
        options.getUseBase64Values(),
        options.getStripValues());
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamsToPubSubOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDlqDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDlqDirectory();

    LOG.info("DLQ directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetries());
  }

  private static String getBigtableCharset(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getBigtableChangeStreamCharset())
        ? "UTF-8"
        : options.getBigtableChangeStreamCharset();
  }

  private static String getBigtableProjectId(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static String getPubSubProjectId(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getPubSubProjectId())
        ? options.getProject()
        : options.getPubSubProjectId();
  }

  private static Boolean validateSchema(
      Topic topic, BigtableChangeStreamsToPubSubOptions options, BigtableSource source)
      throws Exception {
    String messageFormatPath = topic.getSchemaSettings().getSchema();
    if (topic.getSchemaSettings().getSchema().isEmpty()) {
      validateIncompatibleEncoding(options);
      LOG.info(
          "Topic has no schema configured, pipeline will use message format: {}, message encoding: {}",
          options.getMessageFormat(),
          options.getMessageEncoding());
      return true;
    } else {
      SchemaName schemaName = SchemaName.parse(topic.getSchemaSettings().getSchema());
      Schema schema;
      try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
        schema = schemaServiceClient.getSchema(schemaName);
      }

      options.setMessageEncoding(toMessageEncoding(topic.getSchemaSettings().getEncoding()));

      Schema.Type schemaType = schema.getType();
      switch (schemaType) {
        case AVRO:
          options.setMessageFormat(MessageFormat.AVRO);
          validateNoUseOfBase64(options);
          break;
        case PROTOCOL_BUFFER:
          if (options.getMessageEncoding() == MessageEncoding.JSON) {
            options.setMessageFormat(MessageFormat.JSON);
          } else {
            options.setMessageFormat(MessageFormat.PROTOCOL_BUFFERS);
            validateNoUseOfBase64(options);
          }
          break;
        case TYPE_UNSPECIFIED:
        case UNRECOGNIZED:
          // Not overriding messageFormat, will try what customer configured or the default if
          // not configured
          break;
        default:
          throw new IllegalArgumentException("Topic schema type is not supported: " + schemaType);
      }

      LOG.info("Topic has schema configured: {}", topic.getSchemaSettings().getSchema());
      LOG.info(
          "Pipeline will use message format: {}, message encoding: {}",
          options.getMessageFormat(),
          options.getMessageEncoding());

      PubSubDestination destination = newPubSubDestination(options, topic);
      PubSubUtils pubSub = new PubSubUtils(source, destination);

      ByteString testChangeMessageData = createTestChangeMessage(pubSub).getData();
      Encoding encoding = toPubSubEncoding(options.getMessageEncoding());
      try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
        String testMessageEncoded = toBase64String(testChangeMessageData);
        LOG.info("Validating a test message (Base64 encoded): {}", testMessageEncoded);
        ValidateMessageRequest request =
            ValidateMessageRequest.newBuilder()
                .setParent("projects/" + pubSub.getDestination().getPubSubProject())
                .setEncoding(encoding)
                .setMessage(testChangeMessageData)
                .setName(messageFormatPath)
                .build();
        schemaServiceClient.validateMessage(request);
        LOG.info("Test message successfully validated.");
      } catch (Exception e) {
        throw new IllegalArgumentException("Failed to validate test message", e);
      }
    }
    return true;
  }

  private static void validateNoUseOfBase64(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getUseBase64Values()) {
      throw new IllegalArgumentException(
          "useBase64Values values can only be used with topics accepting JSON messages");
    }
    if (options.getUseBase64Rowkeys()) {
      throw new IllegalArgumentException(
          "useBase64Rowkeys values can only be used with topics accepting JSON messages");
    }
    if (options.getUseBase64ColumnQualifiers()) {
      throw new IllegalArgumentException(
          "useBase64ColumnQualifiers values can only be used with topics accepting JSON messages");
    }
  }

  private static void validateIncompatibleEncoding(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getMessageEncoding() == MessageEncoding.BINARY
        && options.getMessageFormat() == MessageFormat.JSON) {
      throw new IllegalArgumentException(
          "JSON message format is incompatible with BINARY message encoding");
    }
  }

  private static MessageEncoding toMessageEncoding(Encoding encoding) {
    if (encoding == null) {
      return MessageEncoding.JSON;
    }
    switch (encoding) {
      case JSON:
      case ENCODING_UNSPECIFIED:
      case UNRECOGNIZED:
        return MessageEncoding.JSON;
      case BINARY:
        return MessageEncoding.BINARY;
      default:
        throw new IllegalArgumentException("Topic has unsupported message encoding: " + encoding);
    }
  }

  private static String toBase64String(ByteString testChangeMessageData) {
    return Base64.getEncoder().encodeToString(testChangeMessageData.toByteArray());
  }

  private static Encoding toPubSubEncoding(MessageEncoding messageEncoding) {
    switch (messageEncoding) {
      case BINARY:
        return Encoding.BINARY;
      case JSON:
        return Encoding.JSON;
      default:
        throw new IllegalArgumentException("Unexpected message encoding: " + messageEncoding);
    }
  }

  private static PubsubMessage createTestChangeMessage(PubSubUtils pubSub) throws Exception {
    SetCell setCell =
        SetCell.create(
            "test_column_family",
            ByteString.copyFrom("test_column", Charset.defaultCharset()),
            1000L, // timestamp
            ByteString.copyFrom("test_value", Charset.defaultCharset()));

    TestChangeStreamMutation mutation =
        new TestChangeStreamMutation(
            "test_rowkey",
            MutationType.USER,
            "source_cluster",
            org.threeten.bp.Instant.now(), // commit timestamp
            1, // tiebreaker
            "token",
            org.threeten.bp.Instant.now(), // low watermark
            setCell);

    Mod mod = new Mod(pubSub.getSource(), mutation, setCell);

    switch (pubSub.getDestination().getMessageFormat()) {
      case AVRO:
        return pubSub.mapChangeJsonStringToPubSubMessageAsAvro(mod.getChangeJson());
      case PROTOCOL_BUFFERS:
        return pubSub.mapChangeJsonStringToPubSubMessageAsProto(mod.getChangeJson());
      case JSON:
        return pubSub.mapChangeJsonStringToPubSubMessageAsJson(mod.getChangeJson());
      default:
        throw new IllegalArgumentException(
            "Unexpected message format: " + pubSub.getDestination().getMessageFormat());
    }
  }

  /**
   * DoFn that converts a {@link ChangeStreamMutation} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class ChangeStreamMutationToModJsonFn extends DoFn<ChangeStreamMutation, String> {

    private final BigtableSource sourceInfo;

    ChangeStreamMutationToModJsonFn(BigtableSource source) {
      this.sourceInfo = source;
    }

    @ProcessElement
    public void process(@Element ChangeStreamMutation input, OutputReceiver<String> receiver)
        throws Exception {
      for (Entry entry : input.getEntries()) {
        ModType modType = getModType(entry);

        Mod mod = null;
        switch (modType) {
          case SET_CELL:
            mod = new Mod(sourceInfo, input, (SetCell) entry);
            break;
          case DELETE_CELLS:
            mod = new Mod(sourceInfo, input, (DeleteCells) entry);
            break;
          case DELETE_FAMILY:
            mod = new Mod(sourceInfo, input, (DeleteFamily) entry);
            break;
          default:
          case UNKNOWN:
            throw new UnsupportedEntryException(
                "Cloud Bigtable change stream entry of type "
                    + entry.getClass().getName()
                    + " is not supported. The entry was put into a DLQ directory. "
                    + "Please update your Dataflow template with the latest template version");
        }

        String modJsonString;

        try {
          modJsonString = mod.toJson();
        } catch (IOException e) {
          // Ignore exception and print bad format.
          modJsonString = String.format("\"%s\"", input);
        }
        receiver.output(modJsonString);
      }
    }

    private ModType getModType(Entry entry) {
      if (entry instanceof SetCell) {
        return ModType.SET_CELL;
      } else if (entry instanceof DeleteCells) {
        return ModType.DELETE_CELLS;
      } else if (entry instanceof DeleteFamily) {
        return ModType.DELETE_FAMILY;
      }
      return ModType.UNKNOWN;
    }
  }
}

다음 단계