MQTT to Pub/Sub 模板

MQTT to Pub/Sub 模板是一种流处理流水线,可从 MQTT 主题读取消息并将其写入 Pub/Sub。 它包含可选参数 usernamepassword,以防 MQTT 服务器要求进行身份验证。

如果流水线超过 90 分钟未收到来自 MQTT 主题的任何消息,则会发生 StackOverflowError。为解决此问题,您可以每 90 分钟更改一次工作器数量。如需详细了解如何在不停止作业的情况下更改工作器数量,请参阅运行中作业选项更新

流水线要求

  • Pub/Sub 输出主题名称必须存在。
  • MQTT 主机 IP 必须存在,并且工作器机器具有正确的网络配置,以访问 MQTT 主机。
  • 从中提取数据的 MQTT 主题必须具有名称。

模板参数

必需参数

  • inputTopic:从中读取数据的 MQTT 主题的名称。例如 topic
  • outputTopic:将数据写入其中的输出 Pub/Sub 主题的名称。例如 projects/your-project-id/topics/your-topic-name
  • username:用于在 MQTT 服务器上进行身份验证的用户名。例如 sampleusername
  • password:与提供的用户名关联的密码。例如 samplepassword

可选参数

  • brokerServer:MQTT 代理服务器 IP 或主机。例如 tcp://host:1883

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 MQTT to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \
    --parameters \
brokerServer=MQTT_SERVER,\
inputTopic=INPUT_TOPIC,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为项目 ID。
  • 替换为 Dataflow 地区名称。例如:us-central1
  • JOB_NAME 替换为您选择的作业名称。作业名称必须与正则表达式 [a-z]([-a-z0-9]{0,38}[a-z0-9])? 匹配才有效。
  • INPUT_TOPIC 替换为 MQTT 服务器输入主题的名称。例如:testtopic
  • MQTT_SERVER 替换为 MQTT 服务器地址。例如 tcp://10.128.0.62:1883
  • OUTPUT_TOPIC 替换为 Pub/Sub 输出主题的名称。例如:projects/myproject/topics/testoutput
  • USERNAME 替换为 MQTT 服务器的用户名。例如:testuser
  • PASSWORD 替换为与 MQTT 服务器使用的用户名对应的密码。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "brokerServer": "MQTT_SERVER",
          "inputTopic": "INPUT_TOPIC",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub",
   }
}
  

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为项目 ID。
  • 替换为 Dataflow 地区名称。例如:us-central1
  • JOB_NAME 替换为您选择的作业名称。作业名称必须与正则表达式 [a-z]([-a-z0-9]{0,38}[a-z0-9])? 匹配才有效。
  • INPUT_TOPIC 替换为 MQTT 服务器输入主题的名称。例如:testtopic
  • MQTT_SERVER 替换为 MQTT 服务器地址。例如 tcp://10.128.0.62:1883
  • OUTPUT_TOPIC 替换为 Pub/Sub 输出主题的名称。例如:projects/myproject/topics/testoutput
  • USERNAME 替换为 MQTT 服务器的用户名。例如:testuser
  • PASSWORD 替换为与 MQTT 服务器使用的用户名对应的密码。

后续步骤