Datastream to BigQuery 템플릿은 Datastream 데이터를 읽고 BigQuery에 복제하는 스트리밍 파이프라인입니다. 이 템플릿은 Pub/Sub 알림을 사용하여 Cloud Storage에서 데이터를 읽고 시간으로 파티션을 나눈 BigQuery 스테이징 테이블에 복제합니다. 복제 후 템플릿은 BigQuery에서 MERGE
를 실행하여 모든 변경 데이터 캡처(CDC) 변경사항을 소스 테이블의 복제본에 적용합니다.
템플릿은 복제에서 관리하는 BigQuery 테이블의 생성 및 업데이트를 처리합니다. 데이터 정의 언어(DDL)가 필요한 경우 Datastream에 대한 콜백은 소스 테이블 스키마를 추출하여 BigQuery 데이터 유형으로 변환합니다. 지원되는 작업은 다음과 같습니다.
- 데이터가 삽입될 때 새 테이블이 생성됩니다.
- 초기 값이 null인 새 열이 BigQuery 테이블에 추가됩니다.
- 삭제된 열은 BigQuery에서 무시되며 향후 값은 null입니다.
- 이름이 변경된 열은 BigQuery에 새 열로 추가됩니다.
- 형식 변경사항은 BigQuery로 전파되지 않습니다.
파이프라인 요구사항
- 데이터 복제가 가능하거나 이미 복제하고 있는 Datastream 스트림
- Datastream 데이터에 Cloud Storage Pub/Sub 알림이 사용 설정되어 있습니다.
- BigQuery 대상 데이터 세트가 생성되었고 Compute Engine 서비스 계정에 이에 대한 관리 액세스 권한이 부여되었습니다.
- 대상 복제본 테이블을 만들 소스 테이블에 기본 키가 있어야 합니다.
템플릿 매개변수
매개변수 | 설명 |
---|---|
inputFilePattern |
Cloud Storage에서 복제할 Datastream 파일의 파일 위치입니다. 일반적으로 이 파일 위치는 스트림의 루트 경로입니다. |
gcsPubSubSubscription |
Datastream 파일 알림을 사용하는 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id 입니다. |
inputFileFormat |
Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json 입니다. 기본값은 avro 입니다. |
outputStagingDatasetTemplate |
스테이징 테이블을 포함할 기존 데이터 세트의 이름입니다. {_metadata_dataset} 템플릿을 소스 데이터 세트/스키마의 이름으로 대체되는 자리표시자(예: {_metadata_dataset}_log )로 포함할 수 있습니다. |
outputDatasetTemplate |
복제본 테이블을 포함할 기존 데이터 세트의 이름입니다. {_metadata_dataset} 템플릿을 소스 데이터 세트/스키마의 이름으로 대체되는 자리표시자(예: {_metadata_dataset} )로 포함할 수 있습니다. |
deadLetterQueueDirectory |
메시지를 처리할 수 없는 이유와 함께 처리되지 않은 모든 메시지가 저장되는 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 상황에서는 기본값이면 충분합니다. |
outputStagingTableNameTemplate |
(선택사항) 스테이징 테이블 이름의 템플릿입니다. 기본값은 {_metadata_table}_log입니다. 여러 스키마를 복제하는 경우 {_metadata_schema}_{_metadata_table}_log 를 사용하는 것이 좋습니다. |
outputTableNameTemplate |
(선택사항) 복제본 테이블 이름의 템플릿입니다. 기본값은 {_metadata_table} 입니다. 여러 스키마를 복제하는 경우 {_metadata_schema}_{_metadata_table} 을 사용하는 것이 좋습니다. |
outputProjectId |
(선택사항) 데이터를 출력할 BigQuery 데이터 세트의 프로젝트입니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다. |
streamName |
(선택사항) 스키마 정보를 폴링할 스트림의 이름이나 템플릿입니다. 기본값은 {_metadata_stream} 입니다. |
mergeFrequencyMinutes |
(선택사항) 지정된 테이블의 병합 간격(분)입니다. 기본값은 5입니다. |
dlqRetryMinutes |
(선택사항) 데드 레터 큐(DLQ) 재시도 간격(분)입니다. 기본값은 10입니다. |
javascriptTextTransformGcsPath |
(선택사항)
사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js 입니다.
|
javascriptTextTransformFunctionName |
(선택사항)
사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.
예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ } 이면 함수 이름은 myTransform 입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
|
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the Datastream to BigQuery template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행할 Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름REGION_NAME
: Dataflow 작업을 배포할 리전 엔드포인트(예:us-central1
)VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면gs://bucket/path/to/data/
입니다.GCS_SUBSCRIPTION_NAME
: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면projects/my-project-id/subscriptions/my-subscription-id
입니다.BIGQUERY_DATASET
: BigQuery 데이터 세트 이름입니다.BIGQUERY_TABLE
: BigQuery 테이블 템플릿입니다. 예를 들면{_metadata_schema}_{_metadata_table}_log
입니다.
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행할 Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름LOCATION
: Dataflow 작업을 배포할 리전 엔드포인트(예:us-central1
)VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/- the version name, like
2021-09-20-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
GCS_FILE_PATH
: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면gs://bucket/path/to/data/
입니다.GCS_SUBSCRIPTION_NAME
: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면projects/my-project-id/subscriptions/my-subscription-id
입니다.BIGQUERY_DATASET
: BigQuery 데이터 세트 이름입니다.BIGQUERY_TABLE
: BigQuery 테이블 템플릿입니다. 예를 들면{_metadata_schema}_{_metadata_table}_log
입니다.