MongoDB to BigQuery(CDC) 템플릿

이 템플릿은 MongoDB 변경 내역에서 작동하는 스트리밍 파이프라인을 만듭니다. 이 템플릿을 사용하려면 변경 내역 데이터를 Pub/Sub에 게시합니다. 파이프라인은 Pub/Sub에서 JSON 레코드를 읽고 BigQuery에 씁니다. BigQuery에 기록되는 레코드의 형식은 MongoDB to BigQuery 일괄 템플릿과 동일합니다.

파이프라인 요구사항

  • 대상 BigQuery 데이터 세트가 있어야 합니다.
  • Dataflow 작업자 머신에서 소스 MongoDB 인스턴스에 액세스할 수 있어야 합니다.
  • 변경 내역을 읽으려면 Pub/Sub 주제를 만들어야 합니다. 파이프라인이 실행되는 동안 MongoDB 변경 내역에서 변경 데이터 캡처(CDC) 이벤트를 리슨하고 Pub/Sub에 JSON 레코드로 게시합니다. Pub/Sub에 메시지 게시에 대한 자세한 내용은 주제에 메시지 게시를 참조하세요.

템플릿 매개변수

필수 매개변수

  • mongoDbUri: mongodb+srv://:@ 형식의 MongoDB 연결 URI입니다.
  • database: 컬렉션을 읽을 MongoDB의 데이터베이스입니다. (예: my-db)
  • collection: MongoDB 데이터베이스 내부의 컬렉션 이름입니다. (예: my-collection)
  • userOption : 사용자 옵션: FLATTEN 또는 NONE. FLATTEN은 문서를 단일 수준으로 평면화합니다. NONE은 전체 문서를 JSON 문자열로 저장합니다. 기본값은 NONE입니다.
  • inputTopic : 'projects/your-project-id/topics/your-topic-name' 형식의 입력을 읽어올 Pub/Sub 주제입니다(예: projects/your-project-id/topics/your-topic-name).
  • outputTableSpec: 출력을 작성할 BigQuery 테이블 위치입니다. 이름은 <project>:<dataset>.<table_name> 형식이어야 합니다. 테이블 스키마가 입력 객체와 일치해야 합니다.

선택적 매개변수

  • useStorageWriteApiAtLeastOnce: 이 매개변수는 'BigQuery Storage Write API 사용'이 설정된 경우에만 적용됩니다. 사용 설정하면 Storage Write API에 최소 1회의 시맨틱스가 사용되고 그렇지 않은 경우 정확히 한 번의 시맨틱스가 사용됩니다. 기본값은 false입니다.
  • KMSEncryptionKey : mongodb URI 연결 문자열을 복호화하는 Cloud KMS 암호화 키입니다. Cloud KMS 키가 전달되면 mongodb URI 연결 문자열이 모두 암호화되어 전달되어야 합니다. (예: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key).
  • useStorageWriteApi: true이면 파이프라인은 BigQuery에 데이터를 쓸 때 Storage Write API를 사용합니다(https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api 참조). 기본값은 false입니다. 정확히 한 번 모드에서 Storage Write API를 사용할 때는 'BigQuery Storage Write API의 스트림 수' 및 'BigQuery Storage Write API의 트리거 빈도(초)'와 같은 매개변수를 설정해야 합니다. Dataflow 적어도 한 번 모드를 사용 설정하거나 useStorageWriteApiAtLeastOnce 매개변수를 true로 설정하면 스트림 수나 트리거 빈도를 설정할 필요가 없습니다.
  • numStorageWriteApiStreams: 스트림 수는 BigQueryIO Write 변환의 동시 로드를 정의하며 파이프라인에서 사용될 Storage Write API의 스트림 수와 대략 일치합니다. 권장 값은 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api를 참조하세요. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: 트리거 빈도는 BigQuery에서 쿼리하는 데이터가 표시되는 속도를 결정합니다. 권장 값은 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api를 참조하세요.
  • javascriptDocumentTransformGcsPath: 사용자 정의 함수가 포함된 JavaScript 코드의 Cloud Storage 경로 패턴입니다. (예: gs://your-bucket/your-transforms/*.js)
  • javascriptDocumentTransformFunctionName: 함수 이름에는 문자, 숫자, 밑줄만 포함되어야 합니다. (예: 'transform' 또는 'transform_udf1') (예: transform)

사용자 정의 함수

선택적으로 JavaScript에서 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다.

UDF를 사용하려면 JavaScript 파일을 Cloud Storage에 업로드하고 다음 템플릿 매개변수를 설정합니다.

매개변수설명
javascriptDocumentTransformGcsPath JavaScript 파일의 Cloud Storage 위치입니다.
javascriptDocumentTransformFunctionName 자바스크립트 함수의 이름입니다.

자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: MongoDB 문서입니다.
  • 출력: JSON 문자열로 직렬화된 객체입니다.
  • 템플릿 실행

    콘솔

    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the MongoDB to BigQuery (CDC) template을 선택합니다.
    6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
    7. 작업 실행을 클릭합니다.

    gcloud

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC
    

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • OUTPUT_TABLE_SPEC: 대상 BigQuery 테이블 이름입니다.
    • MONGO_DB_URI: MongoDB URI입니다.
    • DATABASE: MongoDB 데이터베이스입니다.
    • COLLECTION: MongoDB 컬렉션입니다.
    • USER_OPTION: FLATTEN 또는 NONE
    • INPUT_TOPIC: Pub/Sub 입력 주제

    API

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • OUTPUT_TABLE_SPEC: 대상 BigQuery 테이블 이름입니다.
    • MONGO_DB_URI: MongoDB URI입니다.
    • DATABASE: MongoDB 데이터베이스입니다.
    • COLLECTION: MongoDB 컬렉션입니다.
    • USER_OPTION: FLATTEN 또는 NONE
    • INPUT_TOPIC: Pub/Sub 입력 주제

    다음 단계