Pub/Sub to MongoDB 模板

Pub/Sub to MongoDB 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 编码的消息并将其以文档的形式写入 MongoDB。如果需要,此流水线还能支持额外的转换,只需通过 JavaScript 用户定义的函数 (UDF) 包含这些转换即可。因架构不匹配、JOSN 格式不正确或执行转换时发生的所有错误都会记录在未处理消息的 BigQuery 表中,并随附输入消息。如果在执行之前不存在任何未处理记录的表,则流水线会自动创建该表。

流水线要求

  • Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • MongoDB 集群必须存在,并且应该可通过 Dataflow 工作器机器访问。

模板参数

参数 说明
inputSubscription Pub/Sub 订阅的名称。例如:projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri 以英文逗号分隔的 MongoDB 服务器列表。例如:192.285.234.12:27017,192.287.123.11:27017
database 存储集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
deadletterTable 存储失败消息(架构不匹配、JSON 格式错误等)的 BigQuery 表。例如:project-id:dataset-name.table-name
javascriptTextTransformGcsPath 可选:.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName 可选: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
javascriptTextTransformReloadIntervalMinutes 可选:指定重新加载 UDF 的频率(以分钟为单位)。如果值大于 0,则 Dataflow 会定期检查 Cloud Storage 中的 UDF 文件,并在文件遭到修改时重新加载 UDF。此参数可让您在流水线运行时更新 UDF,而无需重启作业。如果值为 0,则停用 UDF 重新加载。默认值为 0。
batchSize 可选:用于将文档批量插入 MongoDB 的批次大小。默认值:1000
batchSizeBytes 可选:批次大小(以字节为单位)。默认值:5242880
maxConnectionIdleTime 可选:在出现连接超时之前所允许的空闲时间上限(以秒为单位)。默认值:60000
sslEnabled 可选:用于指示与 MongoDB 的连接是否启用了 SSL 的布尔值。默认值:true
ignoreSSLCertificate 可选:用于指示是否应忽略 SSL 证书的布尔值。默认值:true
withOrdered 可选:允许依次批量插入 MongoDB 的布尔值。默认值:true
withSSLInvalidHostNameAllowed 可选:用于指示是否允许 SSL 连接使用无效主机名的布尔值。默认值:true

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:输入 CSV 文件中的一行。
  • 输出:要插入到 MongoDB 中的字符串化 JSON 文档。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to MongoDB template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

后续步骤