MongoDB to BigQuery 模板

此模板会创建一个批处理流水线,用于从 MongoDB 读取文档并将其写入 BigQuery。

如果您想捕获 MongoDB 变更数据流数据,可以使用 MongoDB to BigQuery (CDC) 模板

流水线要求

  • 目标 BigQuery 数据集必须已存在。
  • 必须可从 Dataflow 工作器机器访问 MongoDB 源实例。

输出格式

输出记录的格式取决于 userOption 参数的值。如果 userOptionNONE,则输出具有以下架构。source_data 字段包含 JSON 格式的文档。

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

如果 userOptionFLATTEN,流水线会展平文档,并将顶级字段写入表格列。例如,假设 MongoDB 集合中的文档包含以下字段:

  • "_id" (string)
  • "title" (string)
  • "genre" (string)

使用 FLATTEN 时,输出具有以下架构。timestamp 字段由模板添加。

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

如果 userOptionJSON,流水线会以 BigQuery JSON 格式存储文档。BigQuery 内置了对使用 JSON 数据类型的 JSON 数据的支持。 如需了解详情,请参阅在 GoogleSQL 中使用 JSON 数据

模板参数

必需参数

  • mongoDbUri:MongoDB 连接 URI,格式为 mongodb+srv://:@.
  • database:从中读取集合的 MongoDB 数据库。例如 my-db
  • collection:MongoDB 数据库中集合的名称。例如 my-collection
  • userOptionFLATTENJSONNONEFLATTEN 将文档展平至单个级别。JSON 以 BigQuery JSON 格式存储文档。NONE 将整个文档存储为 JSON 格式的字符串。默认为:NONE。
  • outputTableSpec:要写入到其中的 BigQuery 表。例如 bigquery-project:dataset.output_table

可选参数

  • KMSEncryptionKey:用于解密 mongodb URI 连接字符串的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则必须以加密方式传递 mongodb URI 连接字符串。例如 projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • filter:json 格式的 Bson 过滤器。例如 { "val": { $gt: 0, $lt: 9 }}
  • useStorageWriteApi:如果为 true,则流水线使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。默认值为 false。如需了解详情,请参阅使用 Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • useStorageWriteApiAtLeastOnce:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false
  • bigQuerySchemaPath:BigQuery JSON 架构的 Cloud Storage 路径。例如 gs://your-bucket/your-schema.json
  • javascriptDocumentTransformGcsPath.js 文件的 Cloud Storage URI,该文件用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://your-bucket/your-transforms/*.js
  • javascriptDocumentTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅“UDF 示例”(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。例如 transform

用户定义的函数

(可选)您可以通过在 JavaScript 中编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。 元素载荷会序列化为 JSON 字符串。

如需使用 UDF,请将 JavaScript 文件上传到 Cloud Storage 并设置以下模板参数:

参数说明
javascriptDocumentTransformGcsPath JavaScript 文件的 Cloud Storage 位置。
javascriptDocumentTransformFunctionName JavaScript 函数的名称。

如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:MongoDB 文档。
  • 输出:序列化为 JSON 字符串的对象。如果 userOptionNONE,则 JSON 对象必须包含一个名为 _id 的属性,其中包含文档 ID。
  • 运行模板

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the MongoDB to BigQuery template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 点击运行作业

    在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 数据库。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN、JSON 或 NONE。

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 数据库。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN、JSON 或 NONE。
    Java
    /*
     * Copyright (C) 2019 Google LLC
     *
     * Licensed under the Apache License, Version 2.0 (the "License"); you may not
     * use this file except in compliance with the License. You may obtain a copy of
     * the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    package com.google.cloud.teleport.v2.mongodb.templates;
    
    import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
    import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;
    
    import com.google.api.client.json.gson.GsonFactory;
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.teleport.metadata.Template;
    import com.google.cloud.teleport.metadata.TemplateCategory;
    import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.BigQueryWriteOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
    import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
    import com.google.cloud.teleport.v2.mongodb.templates.MongoDbToBigQuery.Options;
    import com.google.cloud.teleport.v2.options.BigQueryStorageApiBatchOptions;
    import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
    import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
    import com.google.common.base.Strings;
    import java.io.IOException;
    import javax.script.ScriptException;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.FileSystems;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.mongodb.FindQuery;
    import org.apache.beam.sdk.io.mongodb.MongoDbIO;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.bson.BsonDocument;
    import org.bson.Document;
    
    /**
     * The {@link MongoDbToBigQuery} pipeline is a batch pipeline which ingests data from MongoDB and
     * outputs the resulting records to BigQuery.
     *
     * <p>Check out <a
     * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery.md">README</a>
     * for instructions on how to use or modify this template.
     */
    @Template(
        name = "MongoDB_to_BigQuery",
        category = TemplateCategory.BATCH,
        displayName = "MongoDB to BigQuery",
        description =
            "The MongoDB to BigQuery template is a batch pipeline that reads documents from MongoDB and writes them to "
                + "BigQuery as specified by the <code>userOption</code> parameter.",
        optionsClass = Options.class,
        flexContainerName = "mongodb-to-bigquery",
        documentation =
            "https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-to-bigquery",
        contactInformation = "https://cloud.google.com/support",
        preview = true,
        requirements = {
          "The target BigQuery dataset must exist.",
          "The source MongoDB instance must be accessible from the Dataflow worker machines."
        })
    public class MongoDbToBigQuery {
      /**
       * Options supported by {@link MongoDbToBigQuery}
       *
       * <p>Inherits standard configuration options.
       */
      public interface Options
          extends PipelineOptions,
              MongoDbOptions,
              BigQueryWriteOptions,
              BigQueryStorageApiBatchOptions,
              JavascriptDocumentTransformerOptions {}
    
      private static class ParseAsDocumentsFn extends DoFn<String, Document> {
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.output(Document.parse(context.element()));
        }
      }
    
      public static void main(String[] args)
          throws ScriptException, IOException, NoSuchMethodException {
        UncaughtExceptionLogger.register();
    
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    
        BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    
        run(options);
      }
    
      public static boolean run(Options options)
          throws ScriptException, IOException, NoSuchMethodException {
        Pipeline pipeline = Pipeline.create(options);
        String userOption = options.getUserOption();
    
        TableSchema bigquerySchema;
    
        // Get MongoDbUri plain text or base64 encrypted with a specific KMS encryption key
        String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();
    
        if (options.getBigQuerySchemaPath() != null) {
          // initialize FileSystem to read from GCS
          FileSystems.setDefaultPipelineOptions(options);
          String jsonSchema = getGcsFileAsString(options.getBigQuerySchemaPath());
          GsonFactory gf = new GsonFactory();
          bigquerySchema = gf.fromString(jsonSchema, TableSchema.class);
        } else if (options.getJavascriptDocumentTransformFunctionName() != null
            && options.getJavascriptDocumentTransformGcsPath() != null) {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchemaForUDF(
                  mongoDbUri,
                  options.getDatabase(),
                  options.getCollection(),
                  options.getJavascriptDocumentTransformGcsPath(),
                  options.getJavascriptDocumentTransformFunctionName(),
                  options.getUserOption());
        } else {
          bigquerySchema =
              MongoDbUtils.getTableFieldSchema(
                  mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption());
        }
    
        MongoDbIO.Read readDocuments =
            MongoDbIO.read()
                .withUri(mongoDbUri)
                .withDatabase(options.getDatabase())
                .withCollection(options.getCollection());
    
        String filterJson = options.getFilter();
        BsonDocument filter;
        if (!Strings.isNullOrEmpty(filterJson)
            && !(filter = BsonDocument.parse(filterJson)).isEmpty()) {
          readDocuments = readDocuments.withQueryFn(FindQuery.create().withFilters(filter));
        }
    
        pipeline
            .apply("Read Documents", readDocuments)
            .apply(
                "UDF",
                TransformDocumentViaJavascript.newBuilder()
                    .setFileSystemPath(options.getJavascriptDocumentTransformGcsPath())
                    .setFunctionName(options.getJavascriptDocumentTransformFunctionName())
                    .build())
            .apply(
                "Transform to TableRow",
                ParDo.of(
                    new DoFn<Document, TableRow>() {
    
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Document document = c.element();
                        TableRow row = MongoDbUtils.getTableSchema(document, userOption);
                        c.output(row);
                      }
                    }))
            .apply(
                "Write to Bigquery",
                BigQueryIO.writeTableRows()
                    .to(options.getOutputTableSpec())
                    .withSchema(bigquerySchema)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        pipeline.run();
        return true;
      }
    }
    

    后续步骤