MongoDB to BigQuery (CDC) 模板

此模板可以创建一种适用于 MongoDB 变更数据流的流处理流水线。如需使用此模板,请将变更数据流数据发布到 Pub/Sub。该流水线从 Pub/Sub 读取 JSON 记录并将其写入 BigQuery。写入 BigQuery 的记录与 MongoDB to BigQuery 批处理模板的格式相同。

流水线要求

  • 目标 BigQuery 数据集必须已存在。
  • 必须可从 Dataflow 工作器机器访问 MongoDB 源实例。
  • 您必须创建 Pub/Sub 主题才能读取变更数据流。 在流水线运行时,监听 MongoDB 变更数据流中的变更数据捕获 (CDC) 事件,并将其作为 JSON 记录发布到 Pub/Sub。如需详细了解如何将消息发布到 Pub/Sub,请参阅将消息发布到主题

模板参数

参数 说明
mongoDbUri MongoDB 连接 URI,格式为 mongodb+srv://:@
database 从中读取集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
outputTableSpec 要写入的 BigQuery 表。例如 bigquery-project:dataset.output_table
userOption FLATTENNONEFLATTEN 将文档展平至第一级。NONE 将整个文档存储为 JSON 字符串。
inputTopic 要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic>
javascriptDocumentTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptDocumentTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
useStorageWriteApi (可选)如果为 true,则流水线使用 BigQuery Storage Write API。默认值为 false。如需了解详情,请参阅使用 Storage Write API
useStorageWriteApiAtLeastOnce (可选)使用 Storage Write API 时,请指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false
numStorageWriteApiStreams (可选)使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。
storageWriteApiTriggeringFrequencySec (可选)使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。

用户定义的函数

(可选)您可以通过在 JavaScript 中编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。 元素载荷会序列化为 JSON 字符串。

如需使用 UDF,请将 JavaScript 文件上传到 Cloud Storage 并设置以下模板参数:

参数说明
javascriptDocumentTransformGcsPath JavaScript 文件的 Cloud Storage 位置。
javascriptDocumentTransformFunctionName JavaScript 函数的名称。

如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:MongoDB 文档。
  • 输出:序列化为 JSON 字符串的对象。
  • 运行模板

    控制台

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the MongoDB to BigQuery (CDC) template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 点击运行作业

    gcloud

    在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC
    

    请替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 数据库。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN 或 NONE。
    • INPUT_TOPIC:您的 Pub/Sub 输入主题。

    API

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    请替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 数据库。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN 或 NONE。
    • INPUT_TOPIC:您的 Pub/Sub 输入主题。

    后续步骤