이 템플릿은 MongoDB 변경 내역에서 작동하는 스트리밍 파이프라인을 만듭니다. 이 템플릿을 사용하려면 변경 내역 데이터를 Pub/Sub에 게시합니다. 파이프라인은 Pub/Sub에서 JSON 레코드를 읽고 BigQuery에 씁니다. BigQuery에 기록되는 레코드의 형식은 MongoDB to BigQuery 일괄 템플릿과 동일합니다.
파이프라인 요구사항
대상 BigQuery 데이터 세트가 있어야 합니다.
Dataflow 작업자 머신에서 소스 MongoDB 인스턴스에 액세스할 수 있어야 합니다.
변경 내역을 읽으려면 Pub/Sub 주제를 만들어야 합니다.
파이프라인이 실행되는 동안 MongoDB 변경 내역에서 변경 데이터 캡처(CDC) 이벤트를 리슨하고 Pub/Sub에 JSON 레코드로 게시합니다. Pub/Sub에 메시지 게시에 대한 자세한 내용은 주제에 메시지 게시를 참조하세요.
템플릿 매개변수
필수 매개변수
mongoDbUri: mongodb+srv://:@. 형식의 MongoDB 연결 URI입니다.
database: 컬렉션을 읽을 MongoDB의 데이터베이스입니다. (예: my-db).
collection: MongoDB 데이터베이스 내부의 컬렉션 이름입니다. (예: my-collection).
userOption : FLATTEN 또는 NONE. FLATTEN은 문서를 단일 수준으로 평면화합니다. NONE은 전체 문서를 JSON 문자열로 저장합니다. 기본값은 NONE입니다.
inputTopic : 읽어올 Pub/Sub 입력 주제로, projects/<PROJECT_ID>/topics/<TOPIC_NAME> 형식입니다.
outputTableSpec : 작성할 BigQuery 테이블입니다. 예를 들면 bigquery-project:dataset.output_table입니다.
KMSEncryptionKey : mongodb URI 연결 문자열을 복호화하는 Cloud KMS 암호화 키입니다. Cloud KMS 키가 전달되면 mongodb URI 연결 문자열이 모두 암호화되어 전달되어야 합니다. (예: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key).
numStorageWriteApiStreams : Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApi가 true이고 useStorageWriteApiAtLeastOnce가 false이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
storageWriteApiTriggeringFrequencySec : Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApi가 true이고 useStorageWriteApiAtLeastOnce가 false이면 이 매개변수를 설정해야 합니다.
javascriptDocumentTransformGcsPath : 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. (예: gs://your-bucket/your-transforms/*.js)
javascriptDocumentTransformFunctionName : 사용할 JavaScript 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요. (예: transform)
사용자 정의 함수
선택적으로 JavaScript에서 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다.
요소 페이로드는 JSON 문자열로 직렬화됩니다.
UDF를 사용하려면 JavaScript 파일을 Cloud Storage에 업로드하고 다음 템플릿 매개변수를 설정합니다.
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.mongodb.templates;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.BigQueryWriteOptions;
import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.PubSubOptions;
import com.google.cloud.teleport.v2.mongodb.templates.MongoDbToBigQueryCdc.Options;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import java.io.IOException;
import javax.script.ScriptException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link MongoDbToBigQueryCdc} pipeline is a streaming pipeline which reads data pushed to
* PubSub from MongoDB Changestream and outputs the resulting records to BigQuery.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery_CDC.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "MongoDB_to_BigQuery_CDC",
category = TemplateCategory.STREAMING,
displayName = "MongoDB to BigQuery (CDC)",
description =
"The MongoDB to BigQuery CDC (Change Data Capture) template is a streaming pipeline that works together with MongoDB change streams. "
+ "The pipeline reads the JSON records pushed to Pub/Sub via a MongoDB change stream and writes them to BigQuery as specified by the <code>userOption</code> parameter.",
optionsClass = Options.class,
flexContainerName = "mongodb-to-bigquery-cdc",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-change-stream-to-bigquery",
contactInformation = "https://cloud.google.com/support",
preview = true,
requirements = {
"The target BigQuery dataset must exist.",
"The source MongoDB instance must be accessible from the Dataflow worker machines.",
"The change stream pushing changes from MongoDB to Pub/Sub should be running."
},
streaming = true,
supportsAtLeastOnce = true)
public class MongoDbToBigQueryCdc {
private static final Logger LOG = LoggerFactory.getLogger(MongoDbToBigQuery.class);
/** Options interface. */
public interface Options
extends PipelineOptions,
MongoDbOptions,
PubSubOptions,
BigQueryWriteOptions,
JavascriptDocumentTransformerOptions,
BigQueryStorageApiStreamingOptions {
// Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
// on when pipeline is running on ALO mode and using the Storage Write API
@TemplateParameter.Boolean(
order = 1,
optional = true,
parentName = "useStorageWriteApi",
parentTriggerValues = {"true"},
description = "Use at at-least-once semantics in BigQuery Storage Write API",
helpText =
"When using the Storage Write API, specifies the write semantics. To"
+ " use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to `true`. To use exactly-"
+ " once semantics, set the parameter to `false`. This parameter applies only when"
+ " `useStorageWriteApi` is `true`. The default value is `false`.",
hiddenUi = true)
@Default.Boolean(false)
@Override
Boolean getUseStorageWriteApiAtLeastOnce();
void setUseStorageWriteApiAtLeastOnce(Boolean value);
}
/** class ParseAsDocumentsFn. */
private static class ParseAsDocumentsFn extends DoFn<String, Document> {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(Document.parse(context.element()));
}
}
/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args)
throws ScriptException, IOException, NoSuchMethodException {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
run(options);
}
/** Pipeline to read data from PubSub and write to MongoDB. */
public static boolean run(Options options)
throws ScriptException, IOException, NoSuchMethodException {
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
String userOption = options.getUserOption();
String inputOption = options.getInputTopic();
TableSchema bigquerySchema;
// Get MongoDbUri
String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();
if (options.getJavascriptDocumentTransformFunctionName() != null
&& options.getJavascriptDocumentTransformGcsPath() != null) {
bigquerySchema =
MongoDbUtils.getTableFieldSchemaForUDF(
mongoDbUri,
options.getDatabase(),
options.getCollection(),
options.getJavascriptDocumentTransformGcsPath(),
options.getJavascriptDocumentTransformFunctionName(),
options.getUserOption());
} else {
bigquerySchema =
MongoDbUtils.getTableFieldSchema(
mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption());
}
pipeline
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(inputOption))
.apply(
"RTransform string to document",
ParDo.of(
new DoFn<String, Document>() {
@ProcessElement
public void process(ProcessContext c) {
Document document = Document.parse(c.element());
c.output(document);
}
}))
.apply(
"UDF",
TransformDocumentViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptDocumentTransformGcsPath())
.setFunctionName(options.getJavascriptDocumentTransformFunctionName())
.build())
.apply(
"Read and transform data",
ParDo.of(
new DoFn<Document, TableRow>() {
@ProcessElement
public void process(ProcessContext c) {
Document document = c.element();
TableRow row = MongoDbUtils.getTableSchema(document, userOption);
c.output(row);
}
}))
.apply(
BigQueryIO.writeTableRows()
.to(options.getOutputTableSpec())
.withSchema(bigquerySchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
pipeline.run();
return true;
}
}
[[["이해하기 쉬움","easyToUnderstand","thumb-up"],["문제가 해결됨","solvedMyProblem","thumb-up"],["기타","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["번역 문제","translationIssue","thumb-down"],["기타","otherDown","thumb-down"]],["최종 업데이트: 2024-08-06(UTC)"],[],[]]