Datastream to BigQuery(Stream) 템플릿

Datastream to BigQuery 템플릿은 Datastream 데이터를 읽고 BigQuery에 복제하는 스트리밍 파이프라인입니다. 이 템플릿은 Pub/Sub 알림을 사용하여 Cloud Storage에서 데이터를 읽고 시간으로 파티션을 나눈 BigQuery 스테이징 테이블에 복제합니다. 복제 후 템플릿은 BigQuery에서 MERGE를 실행하여 모든 변경 데이터 캡처(CDC) 변경사항을 소스 테이블의 복제본에 적용합니다. Pub/Sub 알림에서 데이터를 읽으려면 gcsPubSubSubscription 매개변수를 지정합니다. 또는 Cloud Storage의 파일에서 데이터를 직접 읽으려면 inputFilePattern 매개변수를 제공합니다.

템플릿은 복제에서 관리하는 BigQuery 테이블의 생성 및 업데이트를 처리합니다. 데이터 정의 언어(DDL)가 필요한 경우 Datastream에 대한 콜백은 소스 테이블 스키마를 추출하여 BigQuery 데이터 유형으로 변환합니다. 지원되는 작업은 다음과 같습니다.

  • 데이터가 삽입될 때 새 테이블이 생성됩니다.
  • 초기 값이 null인 새 열이 BigQuery 테이블에 추가됩니다.
  • 삭제된 열은 BigQuery에서 무시되며 향후 값은 null입니다.
  • 이름이 변경된 열은 BigQuery에 새 열로 추가됩니다.
  • 형식 변경사항은 BigQuery로 전파되지 않습니다.

임시 BigQuery 테이블의 데이터를 기본 BigQuery 테이블에 병합할 때 템플릿이 중복 삭제를 실행하므로 한 번 이상 스트리밍 모드를 사용하여 이 파이프라인을 실행하는 것이 좋습니다. 파이프라인의 이 단계는 정확히 한 번 스트리밍 모드를 사용하는 데 추가 이점이 없다는 것을 의미합니다.

파이프라인 요구사항

  • 데이터 복제가 가능하거나 이미 복제하고 있는 Datastream 스트림
  • Datastream 데이터에 Cloud Storage Pub/Sub 알림이 사용 설정되어 있습니다.
  • BigQuery 대상 데이터 세트가 생성되었고 Compute Engine 서비스 계정에 이에 대한 관리자 액세스 권한이 부여되었습니다.
  • 대상 복제본 테이블을 만들 소스 테이블에 기본 키가 있어야 합니다.
  • MySQL 또는 Oracle 소스 데이터베이스 PostgreSQL 및 SQL Server 데이터베이스는 지원되지 않습니다.

템플릿 매개변수

필수 매개변수

  • inputFilePattern: Cloud Storage에서 Datastream 파일 출력을 위한 파일 위치입니다(gs://<BUCKET_NAME>/<ROOT_PATH>/ 형식).
  • inputFileFormat: Datastream에서 생성한 출력 파일의 형식입니다. 허용되는 값은 avrojson, 입니다. 기본값은 avro입니다.
  • gcsPubSubSubscription: Cloud Storage에서 처리할 수 있는 새 파일을 Dataflow에 알리는 데 사용하는 Pub/Sub 구독입니다(projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> 형식).
  • outputStagingDatasetTemplate: 스테이징 테이블이 포함된 데이터 세트의 이름입니다. 이 매개변수는 템플릿(예: {_metadata_dataset}_log 또는 my_dataset_log)을 지원합니다. 일반적으로 이 매개변수는 데이터 세트 이름입니다. 기본값은 {_metadata_dataset}입니다. 참고: MySQL 소스의 경우 데이터베이스 이름이 {_metadata_dataset}가 아닌 {_metadata_schema}에 매핑됩니다.
  • outputDatasetTemplate: 복제본 테이블이 포함된 데이터 세트의 이름입니다. 이 매개변수는 템플릿을 지원합니다(예: {_metadata_dataset} 또는 my_dataset). 일반적으로 이 매개변수는 데이터 세트 이름입니다. 기본값은 {_metadata_dataset}입니다. 참고: MySQL 소스의 경우 데이터베이스 이름이 {_metadata_dataset}가 아닌 {_metadata_schema}에 매핑됩니다.
  • deadLetterQueueDirectory: Dataflow가 데드 레터 큐 출력을 쓰는 데 사용하는 경로입니다. 이 경로는 Datastream 파일 출력과 동일한 경로에 있으면 안 됩니다. 기본값은 empty입니다.

선택적 매개변수

  • streamName: 스키마 정보를 폴링할 스트림의 이름이나 템플릿입니다. 기본값은 {_metadata_stream}입니다. 보통 기본값으로 충분합니다.
  • rfcStartDateTime: Cloud Storage에서 데이터를 가져오는 데 사용할 시작 DateTime입니다(https://tools.ietf.org/html/rfc3339). 기본값은 1970-01-01T00:00:00.00Z입니다.
  • fileReadConcurrency: 읽을 동시 DataStream 파일 수입니다. 기본값은 10입니다.
  • outputProjectId: 데이터를 출력할 BigQuery 데이터 세트가 포함된 Google Cloud 프로젝트의 ID입니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
  • outputStagingTableNameTemplate: 스테이징 테이블의 이름을 지정하는 데 사용할 템플릿입니다. 예를 들면 {_metadata_table}입니다. 기본값은 {_metadata_table}_log입니다.
  • outputTableNameTemplate: 복제본 테이블 이름에 사용할 템플릿입니다(예: {_metadata_table}). 기본값은 {_metadata_table}입니다.
  • ignoreFields: BigQuery에서 무시할 쉼표로 구분된 필드입니다. 기본값은 _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count입니다. 예를 들면 _metadata_stream,_metadata_schema입니다.
  • mergeFrequencyMinutes: 지정된 테이블의 병합 간격(분)입니다. 기본값은 5입니다.
  • dlqRetryMinutes: DLQ 재시도 간격(분)입니다. 기본값은 10입니다.
  • dataStreamRootUrl: Datastream API 기준 URL입니다. 기본값은 https://datastream.googleapis.com/입니다.
  • applyMerge: 작업에 대해 MERGE 쿼리를 사용 중지할지 여부입니다. 기본값은 true입니다.
  • mergeConcurrency: 동시 실행되는 BigQuery MERGE 쿼리의 수입니다. applyMerge가 true로 설정된 경우에만 유효합니다. 기본값은 30입니다.
  • partitionRetentionDays: BigQuery 병합을 실행할 때 파티션 보관에 사용할 일 수입니다. 기본값은 1입니다.
  • useStorageWriteApiAtLeastOnce: 이 매개변수는 Use BigQuery Storage Write API가 사용 설정된 경우에만 적용됩니다. true인 경우 Storage Write API에 최소 1회의 시맨틱스가 사용됩니다. 그렇지 않으면 1회만 실행되는 시맨틱이 사용됩니다. 기본값은 false입니다.
  • javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수(UDF) 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요.
  • javascriptTextTransformReloadIntervalMinutes: UDF를 새로고침할 빈도(분)를 지정합니다. 값이 0보다 크면 Dataflow가 Cloud Storage에서 UDF 파일을 주기적으로 검사하고 파일이 수정된 경우 UDF를 새로고침합니다. 이 매개변수를 사용하면 파이프라인이 실행 중일 때 작업을 다시 시작할 필요 없이 UDF를 업데이트할 수 있습니다. 값이 0이면 UDF 새로고침이 사용 중지됩니다. 기본값은 0입니다.
  • pythonTextTransformGcsPath: 사용자 정의 함수가 포함된 Python 코드의 Cloud Storage 경로 패턴입니다. 예를 들면 gs://your-bucket/your-transforms/*.py입니다.
  • pythonRuntimeVersion: 이 Python UDF에 사용할 런타임 버전입니다.
  • pythonTextTransformFunctionName: JavaScript 파일에서 호출할 함수의 이름입니다. 문자, 숫자, 밑줄만 사용합니다. 예를 들면 transform_udf1입니다.
  • runtimeRetries: 런타임이 실패하기 전에 재시도되는 횟수입니다. 기본값은 5입니다.
  • useStorageWriteApi: true이면 파이프라인에서 BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • numStorageWriteApiStreams: Storage Write API를 사용할 경우 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 경우 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: JSON 문자열로 직렬화된 CDC 데이터입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.
  • 템플릿 실행

    콘솔

    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항) 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the Datastream to BigQuery template을 선택합니다.
    6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
    7. (선택사항) 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
    8. 작업 실행을 클릭합니다.

    gcloud

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로. 예를 들면 gs://bucket/path/to/data/입니다.
    • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
    • BIGQUERY_DATASET: BigQuery 데이터 세트 이름.
    • BIGQUERY_TABLE: BigQuery 테이블 템플릿. 예를 들면 {_metadata_schema}_{_metadata_table}_log입니다.

    API

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로. 예를 들면 gs://bucket/path/to/data/입니다.
    • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
    • BIGQUERY_DATASET: BigQuery 데이터 세트 이름.
    • BIGQUERY_TABLE: BigQuery 테이블 템플릿. 예를 들면 {_metadata_schema}_{_metadata_table}_log입니다.

    다음 단계