Datastream to BigQuery(Stream) 템플릿

Datastream to BigQuery 템플릿은 Datastream 데이터를 읽고 BigQuery에 복제하는 스트리밍 파이프라인입니다. 이 템플릿은 Pub/Sub 알림을 사용하여 Cloud Storage에서 데이터를 읽고 시간으로 파티션을 나눈 BigQuery 스테이징 테이블에 복제합니다. 복제 후 템플릿은 BigQuery에서 MERGE를 실행하여 모든 변경 데이터 캡처(CDC) 변경사항을 소스 테이블의 복제본에 적용합니다.

템플릿은 복제에서 관리하는 BigQuery 테이블의 생성 및 업데이트를 처리합니다. 데이터 정의 언어(DDL)가 필요한 경우 Datastream에 대한 콜백은 소스 테이블 스키마를 추출하여 BigQuery 데이터 유형으로 변환합니다. 지원되는 작업은 다음과 같습니다.

  • 데이터가 삽입될 때 새 테이블이 생성됩니다.
  • 초기 값이 null인 새 열이 BigQuery 테이블에 추가됩니다.
  • 삭제된 열은 BigQuery에서 무시되며 향후 값은 null입니다.
  • 이름이 변경된 열은 BigQuery에 새 열로 추가됩니다.
  • 형식 변경사항은 BigQuery로 전파되지 않습니다.

파이프라인 요구사항

  • 데이터 복제가 가능하거나 이미 복제하고 있는 Datastream 스트림
  • Datastream 데이터에 Cloud Storage Pub/Sub 알림이 사용 설정되어 있습니다.
  • BigQuery 대상 데이터 세트가 생성되었고 Compute Engine 서비스 계정에 이에 대한 관리 액세스 권한이 부여되었습니다.
  • 대상 복제본 테이블을 만들 소스 테이블에 기본 키가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern Cloud Storage에서 복제할 Datastream 파일의 파일 위치입니다. 일반적으로 이 파일 위치는 스트림의 루트 경로입니다.
gcsPubSubSubscription Datastream 파일 알림을 사용하는 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
inputFileFormat Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json입니다. 기본값은 avro입니다.
outputStagingDatasetTemplate 스테이징 테이블을 포함할 기존 데이터 세트의 이름입니다. {_metadata_dataset} 템플릿을 소스 데이터 세트/스키마의 이름으로 대체되는 자리표시자(예: {_metadata_dataset}_log)로 포함할 수 있습니다.
outputDatasetTemplate 복제본 테이블을 포함할 기존 데이터 세트의 이름입니다. {_metadata_dataset} 템플릿을 소스 데이터 세트/스키마의 이름으로 대체되는 자리표시자(예: {_metadata_dataset})로 포함할 수 있습니다.
deadLetterQueueDirectory 메시지를 처리할 수 없는 이유와 함께 처리되지 않은 모든 메시지가 저장되는 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 상황에서는 기본값이면 충분합니다.
outputStagingTableNameTemplate (선택사항) 스테이징 테이블 이름의 템플릿입니다. 기본값은 {_metadata_table}_log입니다. 여러 스키마를 복제하는 경우 {_metadata_schema}_{_metadata_table}_log를 사용하는 것이 좋습니다.
outputTableNameTemplate (선택사항) 복제본 테이블 이름의 템플릿입니다. 기본값은 {_metadata_table}입니다. 여러 스키마를 복제하는 경우 {_metadata_schema}_{_metadata_table}을 사용하는 것이 좋습니다.
outputProjectId (선택사항) 데이터를 출력할 BigQuery 데이터 세트의 프로젝트입니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
streamName (선택사항) 스키마 정보를 폴링할 스트림의 이름이나 템플릿입니다. 기본값은 {_metadata_stream}입니다.
mergeFrequencyMinutes (선택사항) 지정된 테이블의 병합 간격(분)입니다. 기본값은 5입니다.
dlqRetryMinutes (선택사항) 데드 레터 큐(DLQ) 재시도 간격(분)입니다. 기본값은 10입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Datastream to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
  • BIGQUERY_DATASET: BigQuery 데이터 세트 이름입니다.
  • BIGQUERY_TABLE: BigQuery 테이블 템플릿입니다. 예를 들면 {_metadata_schema}_{_metadata_table}_log입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
  • BIGQUERY_DATASET: BigQuery 데이터 세트 이름입니다.
  • BIGQUERY_TABLE: BigQuery 테이블 템플릿입니다. 예를 들면 {_metadata_schema}_{_metadata_table}_log입니다.