MongoDB to BigQuery 模板

此模板会创建一种批处理流水线,该流水线可从 MongoDB 读取文档并将其写入 BigQuery。

如果要捕获 MongoDB 变更数据流数据,您可以使用 MongoDB to BigQuery (CDC) 模板

流水线要求

  • 目标 BigQuery 数据集必须已存在。
  • 必须可从 Dataflow 工作器机器访问 MongoDB 源实例。

输出格式

输出记录的格式取决于 userOption 参数的值。如果 userOptionNONE,则输出具有以下架构。source_data 字段包含 JSON 格式的文档。

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

如果 userOptionFLATTEN,则流水线会展平文档并将顶级字段写入表列。例如,假设 MongoDB 集合中的文档包含以下字段:

  • "_id" (string)
  • "title" (string)
  • "genre" (string)

使用 FLATTEN 时,输出具有以下架构。timestamp 字段由模板添加。

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

模板参数

参数 说明
mongoDbUri MongoDB 连接 URI,格式为 mongodb+srv://:@
database 从中读取集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
outputTableSpec 要写入的 BigQuery 表。例如 bigquery-project:dataset.output_table
userOption FLATTENNONEFLATTEN 将文档展平至第一级。NONE 将整个文档存储为 JSON 字符串。
javascriptDocumentTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptDocumentTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
useStorageWriteApi (可选)如果为 true,则流水线使用 BigQuery Storage Write API。默认值为 false。如需了解详情,请参阅使用 Storage Write API
useStorageWriteApiAtLeastOnce (可选)使用 Storage Write API 时,请指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false

用户定义的函数

(可选)您可以通过在 JavaScript 中编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。 元素载荷会序列化为 JSON 字符串。

如需使用 UDF,请将 JavaScript 文件上传到 Cloud Storage 并设置以下模板参数:

参数说明
javascriptDocumentTransformGcsPath JavaScript 文件的 Cloud Storage 位置。
javascriptDocumentTransformFunctionName JavaScript 函数的名称。

如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:MongoDB 文档。
  • 输出:序列化为 JSON 字符串的对象。如果 userOptionNONE,则 JSON 对象必须包含一个名为 _id 的属性,其中包含文档 ID。
  • 运行模板

    控制台

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the MongoDB to BigQuery template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 点击运行作业

    gcloud

    在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION

    请替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 数据库。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN 或 NONE。

    API

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    请替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 数据库。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN 或 NONE。

    后续步骤