Datastream to BigQuery(Stream) 템플릿

Datastream to BigQuery 템플릿은 Datastream 데이터를 읽고 BigQuery에 복제하는 스트리밍 파이프라인입니다. 이 템플릿은 Pub/Sub 알림을 사용하여 Cloud Storage에서 데이터를 읽고 시간으로 파티션을 나눈 BigQuery 스테이징 테이블에 복제합니다. 복제 후 템플릿은 BigQuery에서 MERGE를 실행하여 모든 변경 데이터 캡처(CDC) 변경사항을 소스 테이블의 복제본에 적용합니다.

템플릿은 복제에서 관리하는 BigQuery 테이블의 생성 및 업데이트를 처리합니다. 데이터 정의 언어(DDL)가 필요한 경우 Datastream에 대한 콜백은 소스 테이블 스키마를 추출하여 BigQuery 데이터 유형으로 변환합니다. 지원되는 작업은 다음과 같습니다.

  • 데이터가 삽입될 때 새 테이블이 생성됩니다.
  • 초기 값이 null인 새 열이 BigQuery 테이블에 추가됩니다.
  • 삭제된 열은 BigQuery에서 무시되며 향후 값은 null입니다.
  • 이름이 변경된 열은 BigQuery에 새 열로 추가됩니다.
  • 형식 변경사항은 BigQuery로 전파되지 않습니다.

임시 BigQuery 테이블의 데이터를 기본 BigQuery 테이블에 병합할 때 템플릿이 중복 삭제를 실행하므로 한 번 이상 스트리밍 모드를 사용하여 이 파이프라인을 실행하는 것이 좋습니다. 파이프라인의 이 단계는 정확히 한 번 스트리밍 모드를 사용하는 데 추가 이점이 없다는 것을 의미합니다.

파이프라인 요구사항

  • 데이터 복제가 가능하거나 이미 복제하고 있는 Datastream 스트림
  • Datastream 데이터에 Cloud Storage Pub/Sub 알림이 사용 설정되어 있습니다.
  • BigQuery 대상 데이터 세트가 생성되었고 Compute Engine 서비스 계정에 이에 대한 관리자 액세스 권한이 부여되었습니다.
  • 대상 복제본 테이블을 만들 소스 테이블에 기본 키가 있어야 합니다.
  • MySQL 또는 Oracle 소스 데이터베이스 PostgreSQL 및 SQL Server 데이터베이스는 지원되지 않습니다.

템플릿 매개변수

필수 매개변수

  • inputFilePattern: Cloud Storage에서 Datastream 파일 출력을 위한 파일 위치(gs://<BUCKET_NAME>/<ROOT_PATH>/ 형식)입니다.
  • inputFileFormat: Datastream에서 생성한 출력 파일의 형식입니다. 허용되는 값은 avrojson입니다. 기본값은 avro입니다.
  • gcsPubSubSubscription: Cloud Storage에서 처리할 수 있는 새 파일을 Dataflow에 알리는 데 사용하는 Pub/Sub 구독으로, 형식은 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>입니다.
  • outputStagingDatasetTemplate: 스테이징 테이블이 포함된 데이터 세트의 이름입니다. 이 매개변수는 템플릿(예: {_metadata_dataset}_log 또는 my_dataset_log)을 지원합니다. 일반적으로 이 매개변수는 데이터 세트 이름입니다. 기본값은 {_metadata_dataset}입니다.
  • outputDatasetTemplate: 복제본 테이블이 포함된 데이터 세트의 이름입니다. 이 매개변수는 템플릿(예: {_metadata_dataset} 또는 my_dataset)을 지원합니다. 일반적으로 이 매개변수는 데이터 세트 이름입니다. 기본값은 {_metadata_dataset}입니다.
  • deadLetterQueueDirectory: Dataflow에서 데드 레터 큐 출력을 쓰는 데 사용하는 경로입니다. 이 경로는 Datastream 파일 출력과 동일한 경로에 있으면 안 됩니다. 기본값은 empty입니다.

선택적 매개변수

  • streamName: 스키마 정보를 폴링할 스트림의 이름 또는 템플릿입니다. 기본값은 {_metadata_stream}입니다. 보통 기본값으로 충분합니다.
  • rfcStartDateTime: Cloud Storage에서 데이터를 가져오는 데 사용할 시작 DateTime입니다 (https://tools.ietf.org/html/rfc3339). 기본값은 1970-01-01T00:00:00.00Z입니다.
  • fileReadConcurrency: 읽을 동시 DataStream 파일 수입니다. 기본값은 10입니다.
  • outputProjectId: 데이터를 출력할 BigQuery 데이터 세트가 포함된 Google Cloud 프로젝트의 ID입니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
  • outputStagingTableNameTemplate: 스테이징 테이블의 이름을 지정하는 데 사용할 템플릿입니다. 예를 들면 {_metadata_table}입니다. 기본값은 {_metadata_table}_log입니다.
  • outputTableNameTemplate: 복제본 테이블 이름에 사용할 템플릿입니다(예: {_metadata_table}). 기본값은 {_metadata_table}입니다.
  • ignoreFields: BigQuery에서 무시할 필드를 쉼표로 구분하여 나열합니다. 기본값은 _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count입니다. 예를 들면 _metadata_stream,_metadata_schema입니다.
  • mergeFrequencyMinutes: 지정된 테이블의 병합 간격(분)입니다. 기본값은 5입니다.
  • dlqRetryMinutes: DLQ 재시도 간격(분)입니다. 기본값은 10입니다.
  • dataStreamRootUrl: Datastream API 루트 URL입니다. 기본값은 https://datastream.googleapis.com/입니다.
  • applyMerge: 작업에 대해 MERGE 쿼리를 사용 중지할지 여부입니다. 기본값은 true입니다.
  • mergeConcurrency: 동시 실행되는 BigQuery MERGE 쿼리의 수입니다. applyMerge가 true로 설정된 경우에만 유효합니다. 기본값은 30입니다.
  • partitionRetentionDays: BigQuery 병합을 실행할 때 파티션 보관에 사용할 일 수입니다. 기본값은 1입니다.
  • useStorageWriteApiAtLeastOnce: 이 매개변수는 Use BigQuery Storage Write API가 사용 설정된 경우에만 적용됩니다. true인 경우 Storage Write API에 최소 1회의 시맨틱스가 사용됩니다. 그렇지 않으면 1회만 실행되는 시맨틱이 사용됩니다. 기본값은 false입니다.
  • javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요.
  • javascriptTextTransformReloadIntervalMinutes: UDF를 새로고침할 빈도(분)를 지정합니다. 값이 0보다 크면 Dataflow가 Cloud Storage에서 UDF 파일을 주기적으로 검사하고 파일이 수정된 경우 UDF를 새로고침합니다. 이 매개변수를 사용하면 파이프라인이 실행 중일 때 작업을 다시 시작할 필요 없이 UDF를 업데이트할 수 있습니다. 값이 0이면 UDF 새로고침이 사용 중지됩니다. 기본값은 0입니다.
  • pythonTextTransformGcsPath: 사용자 정의 함수가 포함된 Python 코드의 Cloud Storage 경로 패턴입니다. 예를 들면 gs://your-bucket/your-transforms/*.py입니다.
  • pythonRuntimeVersion: 이 Python UDF에 사용할 런타임 버전입니다.
  • pythonTextTransformFunctionName: JavaScript 파일에서 호출할 함수의 이름입니다. 문자, 숫자, 밑줄만 사용합니다. 예를 들면 transform_udf1입니다.
  • runtimeRetries: 실패하기 전에 런타임이 재시도되는 횟수입니다. 기본값은 5입니다.
  • useStorageWriteApi: true인 경우 파이프라인은 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • numStorageWriteApiStreams: Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: JSON 문자열로 직렬화된 CDC 데이터입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.
  • 템플릿 실행

    콘솔

    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the Datastream to BigQuery template을 선택합니다.
    6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
    7. 선택사항: 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
    8. 작업 실행을 클릭합니다.

    gcloud

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
    • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
    • BIGQUERY_DATASET: BigQuery 데이터 세트 이름입니다.
    • BIGQUERY_TABLE: BigQuery 테이블 템플릿입니다. 예를 들면 {_metadata_schema}_{_metadata_table}_log입니다.

    API

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
    • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
    • BIGQUERY_DATASET: BigQuery 데이터 세트 이름입니다.
    • BIGQUERY_TABLE: BigQuery 테이블 템플릿입니다. 예를 들면 {_metadata_schema}_{_metadata_table}_log입니다.

    다음 단계