Bigtable change streams to Pub/Sub 템플릿

Bigtable change streams to Pub/Sub 템플릿은 Bigtable 데이터 변경 레코드를 스트리밍하고 Dataflow를 사용하여 Pub/Sub 주제에 게시하는 스트리밍 파이프라인입니다.

Bigtable 변경 내역을 사용하면 테이블별로 데이터 변형을 구독할 수 있습니다. 테이블 변경 내역을 구독하면 다음 제약조건이 적용됩니다.

  • 수정된 셀 및 삭제 작업의 설명어만 반환됨
  • 수정된 셀의 새 값만 반환됩니다.

데이터 변경 레코드가 Pub/Sub 주제에 게시되면 메시지가 원본 Bigtable 커밋 타임스탬프 순서 지정과 달리 비순차적으로 삽입될 수 있습니다.

Pub/Sub 주제에 게시할 수 없는 Bigtable 데이터 변경 레코드는 일시적으로 Cloud Storage의 데드 레터 큐(처리되지 않은 메시지 큐) 디렉터리에 저장됩니다. 최대 재시도 실패 횟수를 초과하면 이러한 레코드는 사람이 검토하거나 사용자가 추가로 처리할 수 있도록 동일한 데드 레터 큐 디렉터리에 무기한 저장됩니다.

파이프라인을 사용하려면 대상 Pub/Sub 주제가 있어야 합니다. 대상 주제는 스키마를 사용하여 메시지를 검증하도록 구성할 수 있습니다. Pub/Sub 주제가 스키마를 지정할 때는 스키마가 유효한 경우에만 파이프라인이 시작됩니다. 스키마 유형에 따라 대상 주제에 다음 스키마 정의 중 하나를 사용합니다.

프로토콜 버퍼

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

JSON 메시지 인코딩과 함께 다음 Protobuf 스키마를 사용합니다.

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

각각의 새 Pub/Sub 메시지에는 Bigtable 테이블의 해당 행에서 변경 내역이 반환한 데이터 변경 레코드의 항목 하나가 포함됩니다. Pub/Sub 템플릿은 각 데이터 변경 레코드의 항목을 개별 셀 수준 변경사항으로 평탄화합니다.

Pub/Sub 출력 메시지 설명

입력란 이름 설명
rowKey 변경된 행의 row key입니다. 바이트 배열 형식으로 도착합니다. JSON 메시지 인코딩이 구성된 경우 row key가 문자열로 반환됩니다. useBase64Rowkeys가 지정된 경우 row key가 Base64로 인코딩됩니다. 그렇지 않으면 bigtableChangeStreamCharset에서 지정한 문자 집합을 사용하여 row key 바이트를 문자열로 디코딩합니다.
modType 행 변형의 유형입니다. SET_CELL, DELETE_CELLS, DELETE_FAMILY 중 한 가지 값을 사용합니다.
columnFamily 행 변형의 영향을 받는 column family입니다.
column 행 변형의 영향을 받는 column qualifier입니다. DELETE_FAMILY 변형 유형의 경우 열 필드가 설정되지 않습니다. 바이트 배열 형식으로 도착합니다. JSON 메시지 인코딩이 구성된 경우 열이 문자열로 반환됩니다. useBase64ColumnQualifier가 지정된 경우 열 필드는 Base64로 인코딩됩니다. 그렇지 않으면 bigtableChangeStreamCharset에서 지정한 문자 집합을 사용하여 row key 바이트를 문자열로 디코딩합니다.
commitTimestamp Bigtable에서 변형을 적용하는 시간입니다. 유닉스 시간(1970년 1월 1일 UTC 기준) 이후의 시간이 마이크로초 단위로 측정됩니다.
timestamp 변형의 영향을 받는 셀의 타임스탬프 값입니다. DELETE_CELLSDELETE_FAMILY 변형 유형의 경우 타임스탬프가 설정되지 않습니다. 유닉스 시간(1970년 1월 1일 UTC 기준) 이후의 시간이 마이크로초 단위로 측정됩니다.
timestampFrom DELETE_CELLS 변형으로 삭제된 모든 셀의 타임스탬프 간격의 시작(경계 포함)을 설명합니다. 다른 변형 유형의 경우 timestampFrom이 설정되지 않습니다. 유닉스 시간(1970년 1월 1일 UTC 기준) 이후의 시간이 마이크로초 단위로 측정됩니다.
timestampTo DELETE_CELLS 변형으로 삭제된 모든 셀의 타임스탬프 간격의 종료(경계 제외)를 설명합니다. 다른 변형 유형의 경우 timestampTo이 설정되지 않습니다.
isGC Bigtable 가비지 컬렉션 메커니즘으로 변형이 생성되는지 여부를 나타내는 불리언 값입니다.
tieBreaker 서로 다른 Bigtable 클러스터에서 두 가지 변형을 동시에 등록하면 tiebreaker 값이 가장 높은 변형이 소스 테이블에 적용됩니다. tiebreaker 값이 낮은 변형은 삭제됩니다.
value 변형으로 인해 설정된 새 값입니다. stripValues 파이프라인 옵션이 설정되지 않은 경우 이 값은 SET_CELL 변형에 설정됩니다. 다른 변형 유형의 경우 값이 설정되지 않습니다. 바이트 배열 형식으로 도착합니다. JSON 메시지 인코딩이 구성된 경우 값이 문자열로 반환됩니다. useBase64Values가 지정된 경우 값은 Base64로 인코딩됩니다. 그렇지 않으면 bigtableChangeStreamCharset에서 지정한 문자 집합을 사용하여 값 바이트를 문자열로 디코딩합니다.
sourceInstance 변형을 등록한 Bigtable 인스턴스의 이름입니다. 여러 파이프라인이 서로 다른 인스턴스의 변경사항을 동일한 Pub/Sub 주제로 스트리밍하는 경우일 수 있습니다.
sourceCluster 변형을 등록한 Bigtable 클러스터의 이름입니다. 여러 파이프라인이 서로 다른 인스턴스의 변경사항을 동일한 Pub/Sub 주제로 스트리밍하는 경우에 사용할 수 있습니다.
sourceTable 변형을 수신한 Bigtable 테이블의 이름입니다. 여러 파이프라인이 서로 다른 테이블의 변경사항을 동일한 Pub/Sub 주제로 스트리밍하는 경우에 사용할 수 있습니다.

파이프라인 요구사항

  • 지정된 Bigtable 소스 인스턴스입니다.
  • 지정된 Bigtable 소스 테이블입니다. 테이블에 변경 내역이 사용 설정되어 있어야 합니다.
  • 지정된 Bigtable 애플리케이션 프로필입니다.
  • 지정된 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • pubSubTopic: 대상 Pub/Sub 주제의 이름입니다.
  • bigtableChangeStreamAppProfile: Bigtable 애플리케이션 프로필 ID입니다. 애플리케이션 프로필에서 단일 클러스터 라우팅을 사용하고 단일 행 트랜잭션을 허용해야 합니다.
  • bigtableReadInstanceId: 소스 Bigtable 인스턴스 ID입니다.
  • bigtableReadTableId: 소스 Bigtable 테이블 ID입니다.

선택적 매개변수

  • messageEncoding: Pub/Sub 주제에 게시할 메시지 인코딩입니다. 대상 주제의 스키마가 구성되면 주제 설정에 따라 메시지 인코딩이 결정됩니다. BINARYJSON 값이 지원됩니다. 기본값은 JSON입니다.
  • messageFormat: Pub/Sub 주제에 게시할 메시지 인코딩입니다. 대상 주제의 스키마가 구성되면 주제 설정에 따라 메시지 인코딩이 결정됩니다. AVRO, PROTOCOL_BUFFERS, JSON 값이 지원됩니다. 기본값은 JSON입니다. JSON 형식을 사용하는 경우 메시지의 rowKey, 열, 값 필드는 문자열이며, 해당 콘텐츠는 파이프라인 옵션 useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values, bigtableChangeStreamCharset에 따라 결정됩니다.
  • stripValues: true로 설정하면 SET_CELL 변형이 새 값 설정 없이 반환됩니다. 기본값은 false입니다. 이 매개변수는 새 값이 필요하지 않거나(캐시 무효화라고도 함) 값이 매우 크고 Pub/Sub 메시지 크기 한도를 초과하는 경우에 유용합니다.
  • dlqDirectory: 데드 레터 큐의 디렉터리입니다. 처리하지 못한 레코드가 이 디렉터리에 저장됩니다. 기본적으로 Dataflow 작업 임시 위치 아래의 디렉터리로 지정됩니다. 대부분의 경우 기본 경로를 사용할 수 있습니다.
  • dlqRetryMinutes: 데드 레터 큐 재시도 간격(분)입니다. 기본값은 10입니다.
  • dlqMaxRetries: 데드 레터 최대 재시도입니다. 기본값은 5입니다.
  • useBase64Rowkeys: JSON 메시지 인코딩에 사용됩니다. true로 설정한 경우 rowKey 필드는 Base64로 인코딩된 문자열입니다. 그렇지 않은 경우 rowKeybigtableChangeStreamCharset를 사용하여 바이트를 문자열로 디코딩하도록 생성됩니다. 기본값은 false입니다.
  • pubSubProjectId: Bigtable 프로젝트 ID입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
  • useBase64ColumnQualifiers: JSON 메시지 인코딩과 함께 사용됩니다. true로 설정한 경우 column 필드는 Base64로 인코딩된 문자열입니다. 그렇지 않으면 bigtableChangeStreamCharset로 바이트를 문자열로 디코딩하여 열이 생성됩니다. 기본값은 false입니다.
  • useBase64Values: JSON 메시지 인코딩에 사용됩니다. true로 설정한 경우 값 필드는 Base64로 인코딩된 문자열입니다. 그렇지 않으면 bigtableChangeStreamCharset로 바이트를 문자열로 디코딩하여 값이 생성됩니다. 기본값은 false입니다.
  • disableDlqRetries: DLQ 재시도를 사용 중지할지 여부입니다. 기본값은 false입니다.
  • bigtableChangeStreamMetadataInstanceId: Bigtable 변경 내역 메타데이터 인스턴스 ID입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamMetadataTableTableId : Bigtable 변경 내역 커넥터 메타데이터 테이블의 ID입니다. 제공하지 않으면 파이프라인 실행 중에 Bigtable 변경 내역 커넥터 메타데이터 테이블이 자동으로 생성됩니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamCharset: Bigtable 변경 내역 문자 집합 이름입니다. 기본값은 UTF-8입니다.
  • bigtableChangeStreamStartTimestamp: 변경 내역을 읽는 데 사용할 시작 타임스탬프(https://tools.ietf.org/html/rfc3339)입니다(경계 포함). 예를 들면 2022-05-05T07:59:59Z입니다. 기본값은 파이프라인 시작 시간의 타임스탬프입니다.
  • bigtableChangeStreamIgnoreColumnFamilies: 무시할 column family 이름 변경 내역을 쉼표로 구분한 목록입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamIgnoreColumns: 무시할 열 이름 변경 내역을 쉼표로 구분한 목록입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamName: 클라이언트 파이프라인의 고유한 이름입니다. 이전에 실행 중이던 파이프라인이 중지된 지점에서 처리를 계속할 수 있습니다. 기본값은 자동으로 생성된 이름입니다. 사용된 값은 Dataflow 작업 로그를 참조하세요.
  • bigtableChangeStreamResume: true로 설정하면 bigtableChangeStreamName 값이 동일하고, 이전에 실행 중이던 파이프라인이 중지된 지점부터 새 파이프라인이 처리를 계속합니다. 지정된 bigtableChangeStreamName 값의 파이프라인이 실행된 적이 없는 경우 새 파이프라인이 시작되지 않습니다. false로 설정하면 새 파이프라인이 시작됩니다. bigtableChangeStreamName 값이 동일한 파이프라인이 지정된 소스에 대해 이미 실행된 경우 새 파이프라인이 시작되지 않습니다. 기본값은 false입니다.
  • bigtableReadProjectId: Bigtable 프로젝트 ID입니다. 기본값은 Dataflow 작업의 프로젝트입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bigtable change streams to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_INSTANCE_ID: Bigtable 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: Bigtable 테이블 ID입니다.
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable 애플리케이션 프로필 ID
  • PUBSUB_TOPIC: Pub/Sub 대상 주제 이름

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_INSTANCE_ID: Bigtable 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: Bigtable 테이블 ID입니다.
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable 애플리케이션 프로필 ID
  • PUBSUB_TOPIC: Pub/Sub 대상 주제 이름

다음 단계