MongoDB to BigQuery 템플릿(스트림)

이 템플릿은 MongoDB 변경 내역에서 작동하는 스트리밍 파이프라인을 만듭니다. 이 템플릿을 사용하려면 변경 내역 데이터를 Pub/Sub에 게시합니다. 파이프라인은 Pub/Sub에서 JSON 레코드를 읽고 BigQuery에 씁니다. BigQuery에 기록되는 레코드의 형식은 MongoDB to BigQuery 일괄 템플릿과 동일합니다.

파이프라인 요구사항

  • 대상 BigQuery 데이터 세트가 있어야 합니다.
  • Dataflow 작업자 머신에서 소스 MongoDB 인스턴스에 액세스할 수 있어야 합니다.
  • 변경 내역을 읽으려면 Pub/Sub 주제를 만들어야 합니다. 파이프라인이 실행되는 동안 MongoDB 변경 내역에서 변경 데이터 캡처(CDC) 이벤트를 리슨하고 Pub/Sub에 JSON 레코드로 게시합니다. Pub/Sub에 메시지 게시에 대한 자세한 내용은 주제에 메시지 게시를 참조하세요.
  • 이 템플릿은 MongoDB 변경 내역을 사용합니다. BigQuery 변경 데이터 캡처는 지원하지 않습니다.

템플릿 매개변수

필수 매개변수

  • mongoDbUri: mongodb+srv://:@. 형식의 MongoDB 연결 URI입니다.
  • database: 컬렉션을 읽을 MongoDB의 데이터베이스입니다. 예를 들면 my-db입니다.
  • collection: MongoDB 데이터베이스 내부의 컬렉션 이름입니다. 예를 들면 my-collection입니다.
  • userOption: FLATTEN, JSON 또는 NONE. FLATTEN은 문서를 단일 수준으로 평면화합니다. JSON는 문서를 BigQuery JSON 형식으로 저장합니다. NONE은 전체 문서를 JSON 형식의 문자열로 저장합니다. 기본값은 NONE입니다.
  • inputTopic: 읽어올 Pub/Sub 입력 주제로, projects/<PROJECT_ID>/topics/<TOPIC_NAME> 형식입니다.
  • outputTableSpec: 작성할 BigQuery 테이블입니다. 예를 들면 bigquery-project:dataset.output_table입니다.

선택적 매개변수

  • useStorageWriteApiAtLeastOnce: Storage Write API를 사용할 때 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)로 설정)를 사용하려면, 이 매개변수를 true로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.
  • KMSEncryptionKey: mongodb uri 연결 문자열을 복호화하는 Cloud KMS 암호화 키입니다. Cloud KMS 키가 전달되면 mongodb uri 연결 문자열이 모두 암호화되어 전달되어야 합니다. 예를 들면 projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key입니다.
  • filter: JSON 형식의 Bson 필터입니다. 예를 들면 { "val": { $gt: 0, $lt: 9 }}입니다.
  • useStorageWriteApi: true인 경우 파이프라인은 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • numStorageWriteApiStreams: Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.
  • bigQuerySchemaPath: BigQuery JSON 스키마의 Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/your-schema.json입니다.
  • javascriptDocumentTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://your-bucket/your-transforms/*.js입니다.
  • javascriptDocumentTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요. 예를 들면 transform입니다.

사용자 정의 함수

선택적으로 JavaScript에서 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다.

UDF를 사용하려면 JavaScript 파일을 Cloud Storage에 업로드하고 다음 템플릿 매개변수를 설정합니다.

매개변수설명
javascriptDocumentTransformGcsPath JavaScript 파일의 Cloud Storage 위치입니다.
javascriptDocumentTransformFunctionName 자바스크립트 함수의 이름입니다.

자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: MongoDB 문서입니다.
  • 출력: JSON 문자열로 직렬화된 객체입니다.
  • 템플릿 실행

    콘솔

    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the MongoDB (CDC) to BigQuery template을 선택합니다.
    6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
    7. 작업 실행을 클릭합니다.

    gcloud

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • OUTPUT_TABLE_SPEC: 대상 BigQuery 테이블 이름입니다.
    • MONGO_DB_URI: MongoDB URI입니다.
    • DATABASE: MongoDB 데이터베이스입니다.
    • COLLECTION: MongoDB 컬렉션
    • USER_OPTION: FLATTEN, JSON 또는 NONE입니다.
    • INPUT_TOPIC: Pub/Sub 입력 주제

    API

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • OUTPUT_TABLE_SPEC: 대상 BigQuery 테이블 이름입니다.
    • MONGO_DB_URI: MongoDB URI입니다.
    • DATABASE: MongoDB 데이터베이스입니다.
    • COLLECTION: MongoDB 컬렉션
    • USER_OPTION: FLATTEN, JSON 또는 NONE입니다.
    • INPUT_TOPIC: Pub/Sub 입력 주제

    다음 단계