Bigtable change streams to BigQuery 템플릿

Bigtable change streams to BigQuery 템플릿은 Bigtable 데이터 변경 레코드를 스트리밍하고 Dataflow를 사용하여 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다.

Bigtable 변경 내역을 사용하면 테이블별로 데이터 변형을 구독할 수 있습니다. 테이블 변경 내역을 구독하면 다음 제약조건이 적용됩니다.

  • 수정된 셀 및 삭제 작업의 설명어만 반환됩니다.
  • 수정된 셀의 새 값만 반환됩니다.

데이터 변경 레코드가 BigQuery에 기록되면 행이 원래 Bigtable 커밋 타임스탬프 순서와 다르게 정렬될 수 있습니다.

영구 오류로 인해 BigQuery에 기록할 수 없는 변경 로그 테이블 행은 사람이 직접 검토하거나 사용자가 추가 처리할 수 있도록 Cloud Storage의 데드 레터 큐(처리되지 않은 메시지 큐) 디렉터리에 영구 저장됩니다.

필요한 BigQuery 테이블이 없으면 파이프라인이 테이블을 만듭니다. 그렇지 않으면 기존 BigQuery 테이블이 사용됩니다. 기존 BigQuery 테이블의 스키마에는 다음 테이블의 열이 있어야 합니다.

각각의 새 BigQuery 행에는 Bigtable 테이블의 해당 행에서 변경 내역이 반환한 데이터 변경 레코드 1개가 포함됩니다.

BigQuery 출력 테이블 스키마

열 이름 유형 null 허용 설명
row_key STRING 또는 BYTES 아니요 변경된 행의 row key입니다. writeRowkeyAsBytes 파이프라인 옵션이 true로 설정된 경우 열 유형은 BYTES이어야 합니다. 그렇지 않은 경우 STRING 유형을 사용합니다.
mod_type STRING 아니요 행 변형 유형입니다. SET_CELL, DELETE_CELLS, DELETE_FAMILY 중 한 가지 값을 사용합니다.
column_family STRING 아니요 행 변형의 영향을 받는 column family
column STRING 행 변형의 영향을 받는 column qualifier DELETE_FAMILY 변형 유형의 경우 NULL로 설정합니다.
commit_timestamp TIMESTAMP 아니요 Bigtable이 변형을 적용하는 시간입니다.
big_query_commit_timestamp TIMESTAMP 선택사항: BigQuery가 출력 테이블에 행을 쓰는 시간을 지정합니다. 열 이름이 bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 있으면 이 필드는 채워지지 않습니다.
timestamp TIMESTAMP 또는 INT64 변형의 영향을 받는 셀의 타임스탬프 값입니다. writeNumericTimestamps 파이프라인 옵션이 true로 설정된 경우 열 유형은 INT64이어야 합니다. 그렇지 않은 경우 TIMESTAMP 유형을 사용합니다. DELETE_CELLSDELETE_FAMILY 변형 유형의 경우 NULL로 설정합니다.
timestamp_from TIMESTAMP 또는 INT64 DELETE_CELLS 변형으로 삭제된 모든 셀의 타임스탬프 간격의 시작(경계 포함)을 설명합니다. 다른 변형 유형의 경우 NULL로 설정합니다.
timestamp_to TIMESTAMP 또는 INT64 DELETE_CELLS 변형으로 삭제된 모든 셀의 타임스탬프 간격의 종료(경계 제외)를 설명합니다. 다른 변형 유형의 경우 NULL로 설정합니다.
is_gc BOOL 아니요 선택사항: 가비지 컬렉션 정책에 의해 변형이 트리거되면 true로 설정합니다. 다른 모든 경우에는 false로 설정합니다. bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 열 이름이 있더라도 이 필드는 채워지지 않습니다.
source_instance STRING 아니요 선택사항: 변형이 시작된 Bigtable 인스턴스의 이름을 설명합니다. bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 열 이름이 있더라도 이 필드는 채워지지 않습니다.
source_cluster STRING 아니요 선택사항: 변형이 시작된 Bigtable 클러스터의 이름을 설명합니다. bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 열 이름이 있더라도 이 필드는 채워지지 않습니다.
source_table STRING 아니요 선택사항: 변형이 적용되는 Bigtable 테이블의 이름을 설명합니다. 이 열의 값은 여러 Bigtable 테이블이 동일한 BigQuery 테이블로 변경사항을 스트리밍하는 경우에 유용합니다. bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 열 이름이 있더라도 이 필드는 채워지지 않습니다.
tiebreaker INT64 아니요 선택사항: 두 개의 변형이 다른 Bigtable 클러스터에서 동시에 등록되면 tiebreaker 값이 가장 높은 변형이 소스 테이블에 적용됩니다. tiebreaker 값이 낮은 변형은 삭제됩니다. bigQueryChangelogTableFieldsToIgnore 파이프라인 옵션 값에 열 이름이 있더라도 이 필드는 채워지지 않습니다.
value STRING 또는 BYTES 변형에서 설정한 새 값입니다. writeValuesAsBytes 파이프라인 옵션이 true로 설정된 경우 열 유형은 BYTES이어야 합니다. 그렇지 않은 경우 STRING 유형을 사용합니다. 값이 SET_CELL 변형에 설정됩니다. 다른 변형 유형의 경우 값이 NULL로 설정됩니다.

파이프라인 요구사항

  • 지정된 Bigtable 소스 인스턴스입니다.
  • 지정된 Bigtable 소스 테이블입니다. 테이블에 변경 내역이 사용 설정되어 있어야 합니다.
  • 지정된 Bigtable 애플리케이션 프로필입니다.
  • 지정된 BigQuery 대상 데이터 세트입니다.

템플릿 매개변수

매개변수 설명
bigtableReadInstanceId 소스 Bigtable 인스턴스 ID입니다.
bigtableReadTableId 소스 Bigtable 테이블 ID입니다.
bigtableChangeStreamAppProfile Bigtable 애플리케이션 프로필 ID입니다. 애플리케이션 프로필은 단일 클러스터 라우팅을 사용하고 단일 행 트랜잭션을 허용해야 합니다.
bigQueryDataset 대상 BigQuery 테이블의 데이터 세트 이름입니다.
writeNumericTimestamps 선택사항: Bigtable 타임스탬프를 BigQuery INT64로 작성합니다. true로 설정하면 값이 INT64 열에 기록됩니다. 그렇지 않으면 값이 TIMESTAMP 열에 기록됩니다. 영향을 받는 열은 timestamp, timestamp_from, timestamp_to입니다. 기본값은 false입니다. true로 설정하면 유닉스 시간(1970년 1월 1일 UTC 기준) 이후의 시간이 마이크로초 단위로 측정됩니다.
writeRowkeyAsBytes 선택사항: rowkey를 BigQuery BYTES로 씁니다. true로 설정하면 BYTES 열에 row key가 기록됩니다. 그렇지 않으면 rowkey가 STRING 열에 기록됩니다. 기본값은 false입니다.
writeValuesAsBytes 선택사항: BigQuery BYTES로 값을 씁니다. true로 설정하면 값이 BYTES 열에 기록됩니다. 그렇지 않으면 값이 STRING 열에 기록됩니다. 기본값은 false입니다.
bigQueryChangelogTableName 선택사항: 대상 BigQuery 테이블 이름입니다. 지정하지 않으면 bigtableReadTableId + "_changelog" 값이 사용됩니다.
bigQueryProjectId 선택사항: BigQuery 데이터 세트 프로젝트 ID입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
bigtableReadProjectId 선택사항: Bigtable 프로젝트 ID입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
bigtableChangeStreamMetadataInstanceId 선택사항: Bigtable 변경 내역 메타데이터 인스턴스 ID입니다.
bigtableChangeStreamMetadataTableTableId 선택사항: Bigtable 변경 내역 메타데이터 테이블 ID입니다.
bigtableChangeStreamCharset 선택사항: 값 및 column qualifier를 읽을 때 Bigtable 변경 내역 문자 집합 이름입니다.
bigtableChangeStreamStartTimestamp 선택사항: 변경 내역을 읽는 데 사용할 시작 타임스탬프(경계 포함)입니다. 예를 들면 2022-05-05T07:59:59Z입니다. 기본값은 파이프라인 시작 시간의 타임스탬프입니다.
bigtableChangeStreamIgnoreColumnFamilies 선택사항: 무시할 column family 이름 변경 내역을 쉼표로 구분한 목록입니다.
bigtableChangeStreamIgnoreColumns 선택사항: 무시할 열 이름 변경 내역을 쉼표로 구분한 목록입니다.
bigtableChangeStreamName 선택사항: 클라이언트 파이프라인의 고유한 이름입니다. 이전에 실행 중인 파이프라인이 중단된 시점부터 처리를 재개할 수 있습니다. 기본값은 자동 생성된 이름입니다. 사용된 값은 Dataflow 작업 로그를 참조하세요.
bigtableChangeStreamResume 선택사항: true로 설정하면 이전에 실행한 bigtableChangeStreamName 값이 동일한 파이프라인이 중지된 시점부터 새 파이프라인이 처리를 재개합니다. 지정된 bigtableChangeStreamName 값이 있는 파이프라인이 실행된 적이 없으면 새 파이프라인이 시작되지 않습니다. false로 설정하면 새 파이프라인이 시작됩니다. 지정된 소스에 대해 동일한 bigtableChangeStreamName 값을 가진 파이프라인이 이미 실행된 경우 새 파이프라인이 시작되지 않습니다. 기본값은 false입니다.
bigQueryChangelogTableFieldsToIgnore 선택사항: 지정된 경우 생성되거나 채워지지 않는 쉼표로 구분된 변경 로그 열 목록입니다. 지원되는 값 is_gc, source_instance, source_cluster, source_table, tiebreaker, big_query_commit_timestamp 중 하나를 사용합니다. 기본적으로 모든 열이 채워집니다.
bigQueryChangelogTablePartitionExpirationMs 선택사항: 변경 로그 테이블 파티션 만료 시간을 밀리초 단위로 설정합니다. true로 설정하면 지정된 시간(밀리초)보다 오래된 파티션이 삭제됩니다. 기본적으로 만료 시간은 설정되어 있지 않습니다.
bigQueryChangelogTablePartitionGranularity 선택사항: 변경 로그 테이블의 파티션을 나누는 세부사항을 지정합니다. 설정되면 테이블의 파티션이 나눠집니다. 지원되는 값 HOUR, DAY, MONTH, YEAR 중 하나를 사용합니다. 기본적으로 테이블은 파티션을 나누지 않습니다.
dlqDirectory 선택사항: 데드 레터 큐의 디렉터리입니다. 처리에 실패한 레코드가 이 디렉터리에 저장됩니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 경우 기본 경로를 사용할 수 있습니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bigtable change streams to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID입니다.
  • JOB_NAME: 선택한 고유한 작업 이름입니다.
  • VERSION: 사용할 템플릿 버전입니다.

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전입니다(예: us-central1).
  • BIGTABLE_INSTANCE_ID: Bigtable 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: Bigtable 테이블 ID입니다.
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable 애플리케이션 프로필 ID입니다.
  • BIGQUERY_DESTINATION_DATASET: BigQuery 대상 데이터 세트 이름입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID입니다.
  • JOB_NAME: 선택한 고유한 작업 이름입니다.
  • VERSION: 사용할 템플릿 버전입니다.

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전입니다(예: us-central1).
  • BIGTABLE_INSTANCE_ID: Bigtable 인스턴스 ID입니다.
  • BIGTABLE_TABLE_ID: Bigtable 테이블 ID입니다.
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable 애플리케이션 프로필 ID입니다.
  • BIGQUERY_DESTINATION_DATASET: BigQuery 대상 데이터 세트 이름입니다.

다음 단계