이 템플릿은 MongoDB 변경 내역에서 작동하는 스트리밍 파이프라인을 만듭니다. 이 템플릿을 사용하려면 변경 내역 데이터를 Pub/Sub에 게시합니다. 파이프라인은 Pub/Sub에서 JSON 레코드를 읽고 BigQuery에 씁니다. BigQuery에 기록되는 레코드의 형식은 MongoDB to BigQuery 일괄 템플릿과 동일합니다.
파이프라인 요구사항
- 대상 BigQuery 데이터 세트가 있어야 합니다.
- Dataflow 작업자 머신에서 소스 MongoDB 인스턴스에 액세스할 수 있어야 합니다.
- 변경 내역을 읽으려면 Pub/Sub 주제를 만들어야 합니다. 파이프라인이 실행되는 동안 MongoDB 변경 내역에서 변경 데이터 캡처(CDC) 이벤트를 리슨하고 Pub/Sub에 JSON 레코드로 게시합니다. Pub/Sub에 메시지 게시에 대한 자세한 내용은 주제에 메시지 게시를 참조하세요.
- 이 템플릿은 MongoDB 변경 내역을 사용합니다. BigQuery 변경 데이터 캡처는 지원하지 않습니다.
템플릿 매개변수
필수 매개변수
- mongoDbUri:
mongodb+srv://:@.
형식의 MongoDB 연결 URI입니다. - database: 컬렉션을 읽을 MongoDB의 데이터베이스입니다. 예를 들면
my-db
입니다. - collection: MongoDB 데이터베이스 내부의 컬렉션 이름입니다. 예를 들면
my-collection
입니다. - userOption:
FLATTEN
,JSON
또는NONE
.FLATTEN
은 문서를 단일 수준으로 평면화합니다.JSON
는 문서를 BigQuery JSON 형식으로 저장합니다.NONE
은 전체 문서를 JSON 형식의 문자열로 저장합니다. 기본값은 NONE입니다. - inputTopic: 읽어올 Pub/Sub 입력 주제로,
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
형식입니다. - outputTableSpec: 작성할 BigQuery 테이블입니다. 예를 들면
bigquery-project:dataset.output_table
입니다.
선택적 매개변수
- useStorageWriteApiAtLeastOnce: Storage Write API를 사용할 때 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)로 설정)를 사용하려면, 이 매개변수를
true
로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를false
로 설정합니다. 이 매개변수는useStorageWriteApi
가true
인 경우에만 적용됩니다. 기본값은false
입니다. - KMSEncryptionKey: mongodb uri 연결 문자열을 복호화하는 Cloud KMS 암호화 키입니다. Cloud KMS 키가 전달되면 mongodb uri 연결 문자열이 모두 암호화되어 전달되어야 합니다. 예를 들면
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
입니다. - filter: JSON 형식의 Bson 필터입니다. 예를 들면
{ "val": { $gt: 0, $lt: 9 }}
입니다. - useStorageWriteApi: true인 경우 파이프라인은 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은
false
입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요. - numStorageWriteApiStreams: Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다.
useStorageWriteApi
가true
이고useStorageWriteApiAtLeastOnce
가false
이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다. - storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다.
useStorageWriteApi
가true
이고useStorageWriteApiAtLeastOnce
가false
이면 이 매개변수를 설정해야 합니다. - bigQuerySchemaPath: BigQuery JSON 스키마의 Cloud Storage 경로입니다. 예를 들면
gs://your-bucket/your-schema.json
입니다. - javascriptDocumentTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는
.js
파일의 Cloud Storage URI입니다. 예를 들면gs://your-bucket/your-transforms/*.js
입니다. - javascriptDocumentTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가
myTransform(inJson) { /*...do stuff...*/ }
이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요. 예를 들면transform
입니다.
사용자 정의 함수
선택적으로 JavaScript에서 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다.
UDF를 사용하려면 JavaScript 파일을 Cloud Storage에 업로드하고 다음 템플릿 매개변수를 설정합니다.
매개변수 | 설명 |
---|---|
javascriptDocumentTransformGcsPath |
JavaScript 파일의 Cloud Storage 위치입니다. |
javascriptDocumentTransformFunctionName |
자바스크립트 함수의 이름입니다. |
자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.
함수 사양
UDF의 사양은 다음과 같습니다.
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the MongoDB (CDC) to BigQuery template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION,\ inputTopic=INPUT_TOPIC
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름REGION_NAME
: Dataflow 작업을 배포할 리전(예:us-central1
)VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
OUTPUT_TABLE_SPEC
: 대상 BigQuery 테이블 이름입니다.MONGO_DB_URI
: MongoDB URI입니다.DATABASE
: MongoDB 데이터베이스입니다.COLLECTION
: MongoDB 컬렉션USER_OPTION
: FLATTEN, JSON 또는 NONE입니다.INPUT_TOPIC
: Pub/Sub 입력 주제
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION", "inputTopic": "INPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC", } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름LOCATION
: Dataflow 작업을 배포할 리전(예:us-central1
)VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
OUTPUT_TABLE_SPEC
: 대상 BigQuery 테이블 이름입니다.MONGO_DB_URI
: MongoDB URI입니다.DATABASE
: MongoDB 데이터베이스입니다.COLLECTION
: MongoDB 컬렉션USER_OPTION
: FLATTEN, JSON 또는 NONE입니다.INPUT_TOPIC
: Pub/Sub 입력 주제
다음 단계
- Dataflow 템플릿 알아보기
- Google 제공 템플릿 목록 참조