Google Cloud IoT Core 将于 2023 年 8 月 16 日停用。如需了解详情,请与您的 Google Cloud 客户支持团队联系。

通过 MQTT 网桥发布

本部分介绍设备如何使用 MQTT 网桥与 Cloud IoT Core 通信。如需大致了解 HTTP 和 MQTT,请参阅协议

请务必参阅 API 文档,详细了解本部分中介绍的每种方法。 另请参阅与 MQTT 相关的示例

如需通过 MQTT 网桥发布,请执行以下操作:

  1. 在您的设备上安装 MQTT 客户端。

  2. 在您的设备上下载 MQTT 服务器证书

  3. 配置 MQTT 客户端以向 Cloud IoT Core 验证设备的身份。

  4. 通过 mqtt.googleapis.com长期支持网域发起 TLS 握手。

  5. 发布遥测事件设置设备状态

MQTT 服务器

Cloud IoT Core 通过运行侦听端口 mqtt.googleapis.com:8883 的代管式代理来支持 MQTT 协议。端口 8883 是 IANA 为安全 MQTT 连接保留的标准 TCP 端口。与此端口的连接必须使用 TLS 传输Eclipse Paho 等开源客户端支持该传输。

如果端口 8883 被防火墙阻止,您还可以使用端口 443:mqtt.googleapis.com:443

服务质量 (QoS)

MQTT 规范描述了三个服务质量 (QoS) 级别:

  • QoS 0,最多传送一次
  • QoS 1,至少传送一次
  • QoS 2,仅传送一次

Cloud IoT Core 不支持 QoS 2。发布 QoS 2 消息会关闭连接。使用 QoS 2 订阅预定义主题会将 QoS 级别降级为 QoS 1。

QoS 0 和 1 在 Cloud IoT Core 中的功能如下所示:

  • 成功发送到 Cloud Pub/Sub 后,QoS 为 1 的 PUBLISH 消息将由 PUBACK 消息确认。
  • QoS 为 0 的 PUBLISH 消息不需要 PUBACK 响应,如果消息传送路径存在任何抖动(例如,如果 Cloud Pub/Sub 暂时不可用),则可能会被丢弃。
  • Cloud IoT Core MQTT 网桥会维护一个未传送的消息小缓冲区,以便于重试。如果缓冲区已满,QoS 为 1 的消息可能会被丢弃,并且不会向客户端发送 PUBACK 消息。客户端应重新发送该消息。

对于设备配置,QoS 级别如下所示:

  • 当 QoS 为 0 时,给定的配置版本只会发布到设备一次。如果设备未收到配置,则必须重新订阅。因此,当配置频繁更新(大约几秒或几分钟)并且设备没有必要接收每个更新时,值为 0 的 QoS 非常有用。
  • 当 QoS 为 1 时,将重试最新的配置更新,直到设备使用 PUBACK 确认它。如果在确认旧配置之前推送了新配置,则系统不会重新传送旧配置;而是会递送新配置(并重新传送)。此级别是设备配置的最安全模式:它保证设备最终将获得最新配置。

下载 MQTT 服务器证书

如需使用 TLS 传输,设备必须验证 Cloud IoT Core 服务器证书,以确保它们与 Cloud IoT Core 而不是冒名者进行通信。以下证书包支持验证:

  • mqtt.googleapis.com完整 Google 根 CA 认证软件包 (128 KB)。

    • 此软件包会建立与 Google 产品和服务(包括 Cloud IoT Core)通信的信任链。
    • 具有完整根 CA 认证软件包的设备直接与 MQTT 服务器通信。
    • 此软件包会定期更新。
  • mqtt.2030.ltsapis.goog 的 Google 最小根 CA 集(小于 1 KB)。最小根 CA 集包括主证书备份证书

    • 此集适用于具有内存限制的设备(如微控制器),并仅建立与 Cloud IoT Core 通信的信任链。
    • 具有最小根 CA 集的设备通过长期支持网域与 Cloud IoT Core 进行通信。
    • 一直到 2030 年,此集都将保持不变(主证书和备用证书不会更改)。为了提高安全性,Google 信任服务可能会随时在主证书和备用证书之间切换,恕不另行通知。

将 Google 根 CA 证书下载到您的设备后,您可以配置 MQTT 客户端以对设备进行身份验证、连接到 MQTT 服务器,以及通过 MQTT 网桥进行通信。

配置 MQTT 客户端

MQTT 客户端通过连接到 MQTT 网桥来对设备进行身份验证。如需配置 MQTT 客户端以对设备进行身份验证,请执行以下操作:

  1. 将 MQTT 客户端 ID 设置为完整的设备路径:
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. 将 MQTT 客户端与 MQTT 服务器证书关联。
  3. 将 MQTT 主机名设置为 mqtt.googleapis.com长期支持网域(如果您使用的是最小根 CA 集)。
  4. 指定用户名。MQTT 网桥会忽略用户名字段,但一些 MQTT 客户端库不会发送密码字段,除非指定用户名字段。为获得最佳效果,请提供 unusedignored 等任意用户名。
  5. 设置密码。密码字段必须包含 JWT

以下示例展示了如何配置 MQTT 客户端以对设备进行身份验证:

C++

下面重点介绍配置客户端 ID 并对设备进行身份验证的步骤:
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java


// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
try (MqttClient client =
    new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence())) {
  // Both connect and publish operations may fail. If they do, allow retries but with an
  // exponential backoff time period.
  long initialConnectIntervalMillis = 500L;
  long maxConnectIntervalMillis = 6000L;
  long maxConnectRetryTimeElapsedMillis = 900000L;
  float intervalMultiplier = 1.5f;

  long retryIntervalMs = initialConnectIntervalMillis;
  long totalRetryTimeMs = 0;

  while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
    try {
      client.connect(connectOptions);
    } catch (MqttException e) {
      int reason = e.getReasonCode();

      // If the connection is lost or if the server cannot be connected, allow retries, but with
      // exponential backoff.
      System.out.println("An error occurred: " + e.getMessage());
      if (reason == MqttException.REASON_CODE_CONNECTION_LOST
          || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
        System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
        Thread.sleep(retryIntervalMs);
        totalRetryTimeMs += retryIntervalMs;
        retryIntervalMs *= intervalMultiplier;
        if (retryIntervalMs > maxConnectIntervalMillis) {
          retryIntervalMs = maxConnectIntervalMillis;
        }
      } else {
        throw e;
      }
    }
  }

  attachCallback(client, gatewayId);

  // The topic gateways receive error updates on. QoS must be 0.
  String errorTopic = String.format("/devices/%s/errors", gatewayId);
  System.out.println(String.format("Listening on %s", errorTopic));

  client.subscribe(errorTopic, 0);

  return client;

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

此示例使用 Python 版 Google API 客户端库
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return f"{rc}: {mqtt.error_string(rc)}"

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print(f"Device client_id is '{client_id}'")

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = f"/devices/{device_id}/config"

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = f"/devices/{device_id}/commands/#"

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print(f"Subscribing to {mqtt_command_topic}")
    client.subscribe(mqtt_command_topic, qos=0)

    return client

使用长期 MQTT 网域

长期支持 (LTS) 网域可让您长时间使用一项 TLS 配置。您可以设置 MQTT 客户端一次,配置 MQTT 客户端以通过 LTS 网域发布消息,然后在支持的时间范围内通过 MQTT 网桥持续通信。

当前的活跃 LTS 网域是 mqtt.2030.ltsapis.goog。支持此 LTS 网域,直到 2030 年。

如需使用 LTS 网域,请执行以下操作:

  1. 配置 MQTT 客户端以通过 LTS 网域发布消息。

    1. 配置 MQTT 客户端以向 Cloud IoT Core 验证设备的身份。
    2. 配置设备时,将最小根 CA 集的主证书备用证书与 MQTT 客户端关联。
  2. 在端口 8883 或 443 上通过 mqtt.2030.ltsapis.goog 发起 TLS 握手。请至少使用以下 TLS 功能。

    • TLS 1.2
    • P-256,使用 SHA-256 作为证书密钥和哈希算法
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,为加密套件使用 P-256 和未压缩点
    • 服务器名称指示 (SNI)
    • 基于 TCP 或 UDP 的 DNS

如需详细了解如何保护 MQTT 流量(包括发送到 LTS 网域的消息),请参阅设备安全建议

发布遥测事件

设备配置 MQTT 客户端并连接到 MQTT 网桥后,可以通过以下格式向 MQTT 主题发布 PUBLISH 消息来发布遥测事件:

/devices/DEVICE_ID/events

设备 ID 是在 MQTT 客户端 ID 中指定的设备的字符串 ID。设备 ID 区分大小写。

发布到此 MQTT 主题的消息将转发到相应注册表的默认遥测主题。默认的遥测主题是在注册表资源eventNotificationConfigs[i].pubsubTopicName 字段中指定的 Cloud Pub/Sub 主题。如果不存在默认的 Pub/Sub 主题,则已发布的遥测数据将丢失。如需向其他 Cloud Pub/Sub 主题发布消息,请参阅将遥测事件发布到其他 Cloud Pub/Sub 主题

转发的消息数据字段包含设备发布的消息的副本,并且系统会将以下消息特性添加到 Cloud Pub/Sub 主题中的每条消息:

属性 说明
deviceId 设备的用户定义字符串标识符,例如 thing1。设备 ID 在注册表中必须是唯一的。
deviceNumId 服务器生成的设备数字 ID。创建设备时,Cloud IoT Core 会自动生成设备数字 ID。具有全局唯一性,且不可修改。
deviceRegistryLocation 设备注册表的 Google Cloud Platform 区域,例如 us-central1
deviceRegistryId 设备注册表的用户定义字符串标识符,例如 registry1
projectId 注册表和设备所属的云项目的字符串 ID。
subFolder 该子文件夹可用作事件类别或分类。对于 MQTT 客户端,子文件夹是 DEVICE_ID/events 后面可直接复制的子主题。例如,如果客户端发布到 MQTT 主题 /devices/DEVICE_ID/events/alerts,则子文件夹为字符串 alerts

以下示例展示了如何通过 MQTT 连接发送 PUBLISH 消息:

C++

此示例使用 Python 版 Google API 客户端库
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = (Instant.now().toEpochMilli() - iat.toEpochMilli()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = Instant.now();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

此示例使用 Python 版 Google API 客户端库
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = f"/devices/{args.device_id}/{sub_topic}"

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print(f"Waiting for {delay} before reconnecting.")
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = f"{args.registry_id}/{args.device_id}-payload-{i}"
    print(f"Publishing message {i}/{args.num_messages}: '{payload}'")
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print(f"Refreshing token after {seconds_since_issue}s")
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

将遥测事件发布到其他 Cloud Pub/Sub 主题

设备可以将数据发布到其他 Cloud Pub/Sub 主题。默认情况下,发布到 /devices/DEVICE_ID/events 的 MQTT 消息会转发到相应注册表的默认遥测主题。您可以在 MQTT 主题中指定子文件夹,以将数据转发到其他 Cloud Pub/Sub 主题。该子文件夹是 /devices/DEVICE_ID/events 后面的子主题。

发布到子文件夹的消息会转发到具有相同名称的 Cloud Pub/Sub 主题。相应的注册表必须使用 Cloud Pub/Sub 主题进行配置;否则,消息会转发到默认 Cloud Pub/Sub 主题。

在以下情况下,消息会转发到默认 Cloud Pub/Sub 主题,而不是其他 Cloud Pub/Sub 主题:

  • 未在 MQTT 主题中指定子文件夹
  • 在 MQTT 主题中指定了子文件夹,但它在设备注册表中没有匹配的 Pub/Sub 主题

例如,如果设备发布到 MQTT 主题 /devices/DEVICE_ID/events/alerts,则子文件夹为字符串 alerts。如果 eventNotificationConfigs[i].subfolderMatcheseventNotificationConfigs[i].pubsubTopicName 字段都设置为 alerts,则消息将转发到其他 Cloud Pub/Sub 主题。否则,消息会转发到默认的 Cloud Pub/Sub 主题。

设置设备状态

已连接的设备可以通过向以下 MQTT 主题发出 PUBLISH 消息来报告设备状态:

/devices/DEVICE_ID/state

如需对状态消息进行分类和检索,请使用设备状态主题配置注册表。设备状态主题是在 StateNotificationConfig.pubsubTopicName 字段中指定的 Cloud Pub/Sub 主题。如果注册表配置了设备状态主题,则这些消息将尽力转发到匹配的 Cloud Pub/Sub 主题。

如需详细了解如何检索状态消息,请参阅获取设备状态

限制 MQTT 流量

Cloud IoT Core 会限制生成过多负载的项目。如果设备在未等待的情况下重试失败的操作,则可能会触发限制,影响同一 Google Cloud 项目中的所有设备。

对于重试,我们强烈建议您使用引入的抖动实现截断指数退避算法

Keep-alive

从客户端发送初始 MQTT CONNECT 消息时,您可以提供可选的“keep-alive”值。此值是一个时间间隔(以秒为单位),在此期间,代理期望客户端发送消息(如 PUBLISH 消息)。如果在时间段内未从客户端向 Broker 发送任何消息,则 Broker 会自动关闭连接。请注意,您指定的 keep-alive 值乘以 1.5,因此,设置 10 分钟的 keep-alive 实际上会导致 15 分钟的间隔。

如需了解详情,请参阅 MQTT 规范

客户端设置

Cloud IoT Core 不提供自己的默认 keep-alive 值;如果您选择指定 keep-alive 间隔,则必须在客户端中进行设置。

为获得最佳结果,请将客户端的 keep-alive 间隔设置为最小值(60 秒)。许多开源客户端库(包括 CPythonNode.jsJava 的 Paho MQTT 库默认使用 60 秒。

空闲时间限制

与 keep-alive 间隔不同,Cloud IoT Core 有自己的空闲时间限制(20 分钟)。根据此限制,如果客户端在 20 分钟内未发送任何消息,则客户端连接将自动终止,即使 keep-alive 间隔更长也是如此。如果未提供 keep-alive 值,则默认的空闲超时值 20 分钟仍然有效。

问题排查

如果在连接时遇到问题,请参阅问题排查