Pub/Sub to MongoDB 模板

Pub/Sub to MongoDB 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 编码的消息并将其以文档的形式写入 MongoDB。如果需要,此流水线还能支持额外的转换,只需通过 JavaScript 用户定义的函数 (UDF) 包含这些转换即可。

如果在处理记录时发生错误,则模板会将它们与输入消息一起写入 BigQuery 表中。例如,错误可能会由于架构不匹配、JSON 格式错误而发生或是在执行转换期间发生。在 deadletterTable 参数中指定表名称。如果表不存在,流水线会自动创建它。

流水线要求

  • Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • MongoDB 集群必须存在,并且应该可通过 Dataflow 工作器机器访问。

模板参数

必需参数

  • inputSubscription:Pub/Sub 订阅的名称。例如 projects/your-project-id/subscriptions/your-subscription-name
  • mongoDBUri:以英文逗号分隔的 MongoDB 服务器列表。例如 host1:port,host2:port,host3:port
  • database:存储集合的 MongoDB 数据库。例如 my-db
  • collection:MongoDB 数据库中集合的名称。例如 my-collection
  • deadletterTable:存储因失败(例如架构不匹配、JSON 格式错误等)而导致的消息的 BigQuery 表。例如 your-project-id:your-dataset.your-table-name

可选参数

  • batchSize:用于将文档批量插入 MongoDB 的批次大小。默认值为 1000。
  • batchSizeBytes:批次大小(以字节为单位)。默认值为 5242880。
  • maxConnectionIdleTime:在出现连接超时之前所允许的空闲时间上限(以秒为单位)。默认值为 60000。
  • sslEnabled:用于指示与 MongoDB 的连接是否启用了 SSL 的布尔值。默认值为:true。
  • ignoreSSLCertificate:用于指示是否忽略 SSL 证书的布尔值。默认值为:true。
  • withOrdered:允许依次批量插入 MongoDB 的布尔值。默认值为:true。
  • withSSLInvalidHostNameAllowed:一个布尔值,用于指示是否允许 SSL 连接使用无效主机名。默认值为:true。
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:指定重新加载 UDF 的频率(以分钟为单位)。如果值大于 0,则 Dataflow 会定期检查 Cloud Storage 中的 UDF 文件,并在文件修改时重新加载 UDF。此参数可让您在流水线运行时更新 UDF,而无需重启作业。如果值为 0,则停用 UDF 重新加载。默认值为 0

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:来自输入 CSV 文件的单行内容。
  • 输出:要插入到 MongoDB 中的字符串化 JSON 文档。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to MongoDB template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

后续步骤