Bigtable change streams to BigQuery 템플릿

Bigtable change streams to BigQuery 템플릿은 Bigtable 데이터 변경 레코드를 스트리밍하고 Dataflow를 사용하여 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다.

Bigtable 변경 내역을 사용하면 테이블별로 데이터 변형을 구독할 수 있습니다. 테이블 변경 내역을 구독하면 다음 제약조건이 적용됩니다.

  • 수정된 셀 및 삭제 작업의 설명어만 반환됨
  • 수정된 셀의 새 값만 반환됨

데이터 변경 레코드가 BigQuery에 기록될 때는 원본 Bigtable 커밋 타임스탬프 순서 지정과 달리 비순차적으로 행이 삽입될 수 있습니다.

영구 오류로 인해 BigQuery에 기록할 수 없는 변경 로그 테이블 행은 사람이 직접 검토하거나 사용자가 추가 처리할 수 있도록 Cloud Storage의 데드 레터 큐(처리되지 않은 메시지 큐) 디렉터리에 영구 저장됩니다.

필요한 BigQuery 테이블이 없으면 파이프라인이 테이블을 만듭니다. 그렇지 않으면 기존 BigQuery 테이블이 사용됩니다. 기존 BigQuery 테이블의 스키마에는 다음 테이블의 열이 포함되어야 합니다.

각각의 새 BigQuery 행에는 Bigtable 테이블의 해당 행에서 변경 내역이 반환한 데이터 변경 레코드 1개가 포함됩니다.

BigQuery 출력 테이블 스키마

열 이름 유형 null 허용 설명
row_key STRING 또는 BYTES 아니요 변경된 행의 row key입니다. writeRowkeyAsBytes 파이프라인 옵션이 true로 설정된 경우 열 유형은 BYTES여야 합니다. 그렇지 않은 경우 STRING 유형을 사용하세요.
mod_type STRING 아니요 행 변형의 유형입니다. SET_CELL, DELETE_CELLS, DELETE_FAMILY 중 한 가지 값을 사용합니다.
column_family STRING 아니요 행 변형의 영향을 받는 column family입니다.
column STRING 행 변형의 영향을 받는 column qualifier입니다. DELETE_FAMILY 변형 유형의 경우 NULL로 설정하세요.
commit_timestamp TIMESTAMP 아니요 Bigtable에서 변형을 적용하는 시간입니다.
big_query_commit_timestamp TIMESTAMP (선택사항) BigQuery가 출력 테이블에 행을 쓰는 시간을 지정합니다. 열 이름이 bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 있으면 필드가 채워지지 않습니다.
timestamp TIMESTAMP 또는 INT64 변형의 영향을 받는 셀의 타임스탬프 값입니다. writeNumericTimestamps 파이프라인 옵션이 true로 설정된 경우 열 유형은 INT64여야 합니다. 그렇지 않으면 TIMESTAMP 유형을 사용합니다. DELETE_CELLSDELETE_FAMILY 변형 유형의 경우 NULL로 설정하세요.
timestamp_from TIMESTAMP 또는 INT64 DELETE_CELLS 변형으로 삭제된 모든 셀의 타임스탬프 간격의 시작(경계 포함)을 설명합니다. 다른 변형 유형의 경우 NULL로 설정합니다.
timestamp_to TIMESTAMP 또는 INT64 DELETE_CELLS 변형으로 삭제된 모든 셀의 타임스탬프 간격의 종료(경계 제외)를 설명합니다. 다른 변형 유형의 경우 NULL로 설정합니다.
is_gc BOOL 아니요 (선택사항) 가비지 컬렉션 정책에 의해 변형이 트리거되는 경우 true로 설정합니다. 다른 모든 경우에는 false로 설정합니다. 열 이름이 bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 있으면 이 필드가 채워지지 않습니다.
source_instance STRING 아니요 (선택사항) 변형이 발생한 Bigtable 인스턴스의 이름을 설명합니다. 열 이름이 bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 있으면 이 필드가 채워지지 않습니다.
source_cluster STRING 아니요 (선택사항) 변형이 발생한 Bigtable 클러스터의 이름을 설명합니다. 열 이름이 bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 있으면 이 필드가 채워지지 않습니다.
source_table STRING 아니요 (선택사항) 변형이 적용되는 Bigtable 테이블의 이름을 설명합니다. 여러 Bigtable 테이블이 동일한 BigQuery 테이블로 변경사항을 스트리밍하는 경우 이 열의 값이 유용할 수 있습니다. 열 이름이 bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 있으면 이 필드가 채워지지 않습니다.
tiebreaker INT64 아니요 (선택사항) 두 개의 변형이 다른 Bigtable 클러스터에서 동시에 등록되면 tiebreaker 값이 가장 높은 변형이 소스 테이블에 적용됩니다. tiebreaker 값이 낮은 변형은 삭제됩니다. 열 이름이 bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 있으면 이 필드가 채워지지 않습니다.
value STRING 또는 BYTES 변형으로 인해 설정된 새 값입니다. writeValuesAsBytes 파이프라인 옵션이 true로 설정된 경우 열 유형은 BYTES여야 합니다. 그렇지 않은 경우 STRING 유형을 사용하세요. 이 값은 SET_CELL 변형에 설정됩니다. 다른 변형 유형의 경우 이 값이 NULL로 설정됩니다.

파이프라인 요구사항

  • 지정된 Bigtable 소스 인스턴스입니다.
  • 지정된 Bigtable 소스 테이블입니다. 테이블에 변경 내역이 사용 설정되어 있어야 합니다.
  • 지정된 Bigtable 애플리케이션 프로필입니다.
  • 지정된 BigQuery 대상 데이터 세트입니다.

템플릿 매개변수

필수 매개변수

  • bigQueryDataset : 목적지 BigQuery 테이블의 데이터 세트 이름입니다.
  • bigtableChangeStreamAppProfile: Bigtable 애플리케이션 프로필 ID입니다. 애플리케이션 프로필에서 단일 클러스터 라우팅을 사용하고 단일 행 트랜잭션을 허용해야 합니다.
  • bigtableReadInstanceId: 소스 Bigtable 인스턴스 ID입니다.
  • bigtableReadTableId: 소스 Bigtable 테이블 ID입니다.

선택적 매개변수

  • writeRowkeyAsBytes : rowkey를 BigQuery BYTES로 작성할지 여부입니다. true로 설정하면 BYTES 열에 row key가 기록됩니다. 그렇지 않으면 rowkey가 STRING 열에 기록됩니다. 기본값은 false입니다.
  • writeValuesAsBytes : 설정된 경우 true 값이 BYTES 열에 작성되고 그렇지 않으면 STRING 열에 기록됩니다. 기본값은 false입니다.
  • writeNumericTimestamps : Bigtable 타임스탬프를 BigQuery INT64로 쓸지 여부입니다. true로 설정하면 값이 INT64 열에 기록됩니다. 그렇지 않으면 값이 TIMESTAMP 열에 기록됩니다. 영향을 받는 열은 timestamp, timestamp_from, timestamp_to입니다. 기본값은 false입니다. true로 설정하면 유닉스 시간(1970년 1월 1일 UTC 기준) 이후의 시간이 마이크로초 단위로 측정됩니다.
  • bigQueryProjectId : BigQuery 데이터 세트 프로젝트 ID입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
  • bigQueryChangelogTableName : 대상 BigQuery 테이블 이름입니다. 지정하지 않으면 bigtableReadTableId + "_changelog" 값이 사용됩니다. 기본값은 빈 값입니다.
  • bigQueryChangelogTablePartitionGranularity : 변경 로그 테이블의 파티션 나누기를 지정합니다. 설정하면 테이블 파티션이 나눠집니다. 지원되는 값 HOUR, DAY, MONTH, YEAR 중 하나를 사용합니다. 기본적으로 이 테이블은 파티션을 나누지 않습니다.
  • bigQueryChangelogTablePartitionExpirationMs : 변경 로그 테이블 파티션 만료 시간을 밀리초 단위로 설정합니다. true로 설정하면 지정된 밀리초보다 오래된 파티션이 삭제됩니다. 기본적으로 만료 시간은 설정되지 않습니다.
  • bigQueryChangelogTableFieldsToIgnore : 지정된 경우 생성 및 채워지지 않는 변경 로그 열의 쉼표로 구분된 목록입니다. 지원되는 값 is_gc, source_instance, source_cluster, source_table, tiebreaker, big_query_commit_timestamp 중 하나를 사용합니다. 기본적으로 모든 열이 채워집니다.
  • dlqDirectory : 데드 레터 큐에 사용할 디렉터리입니다. 처리하지 못한 레코드가 이 디렉터리에 저장됩니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 경우 기본 경로를 사용할 수 있습니다.
  • bigtableChangeStreamMetadataInstanceId: Bigtable 변경 내역 메타데이터 인스턴스 ID입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamMetadataTableTableId : Bigtable 변경 내역 커넥터 메타데이터 테이블의 ID입니다. 제공하지 않으면 파이프라인 실행 중에 Bigtable 변경 내역 커넥터 메타데이터 테이블이 자동으로 생성됩니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamCharset: Bigtable 변경 내역 문자 집합 이름입니다. 기본값은 UTF-8입니다.
  • bigtableChangeStreamStartTimestamp: 변경 내역을 읽는 데 사용할 시작 타임스탬프(https://tools.ietf.org/html/rfc3339)입니다(경계 포함). 예를 들면 2022-05-05T07:59:59Z입니다. 기본값은 파이프라인 시작 시간의 타임스탬프입니다.
  • bigtableChangeStreamIgnoreColumnFamilies: 무시할 column family 이름 변경 내역을 쉼표로 구분한 목록입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamIgnoreColumns: 무시할 열 이름 변경 내역을 쉼표로 구분한 목록입니다. 기본값은 빈 값입니다.
  • bigtableChangeStreamName: 클라이언트 파이프라인의 고유한 이름입니다. 이전에 실행 중이던 파이프라인이 중지된 지점에서 처리를 계속할 수 있습니다. 기본값은 자동으로 생성된 이름입니다. 사용된 값은 Dataflow 작업 로그를 참조하세요.
  • bigtableChangeStreamResume: true로 설정하면 bigtableChangeStreamName 값이 동일하고, 이전에 실행 중이던 파이프라인이 중지된 지점부터 새 파이프라인이 처리를 계속합니다. 지정된 bigtableChangeStreamName 값의 파이프라인이 실행된 적이 없는 경우 새 파이프라인이 시작되지 않습니다. false로 설정하면 새 파이프라인이 시작됩니다. bigtableChangeStreamName 값이 동일한 파이프라인이 지정된 소스에 대해 이미 실행된 경우 새 파이프라인이 시작되지 않습니다. 기본값은 false입니다.
  • bigtableReadProjectId: Bigtable 프로젝트 ID입니다. 기본값은 Dataflow 작업의 프로젝트입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bigtable change streams to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_INSTANCE_ID: Bigtable 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: Bigtable 테이블 ID입니다.
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable 애플리케이션 프로필 ID입니다.
  • BIGQUERY_DESTINATION_DATASET: BigQuery 대상 데이터 세트 이름입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_INSTANCE_ID: Bigtable 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: Bigtable 테이블 ID입니다.
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable 애플리케이션 프로필 ID입니다.
  • BIGQUERY_DESTINATION_DATASET: BigQuery 대상 데이터 세트 이름입니다.

다음 단계