Google Cloud IoT Core 将于 2023 年 8 月 16 日停用。如需了解详情,请与您的 Google Cloud 客户支持团队联系。

通过 MQTT 网桥使用网关

本页面介绍如何使用 MQTT 网桥与 Cloud IoT Core 通信以及代表绑定的设备发布遥测事件。开始之前,请阅读使用 MQTT 网桥,了解有关将 MQTT 网桥与 Cloud IoT Core 搭配使用的一般信息。

试用端到端演示

通过 MQTT 网桥使用网关

  1. 创建和配置网关后,请通过 MQTT 网桥将其连接到 Cloud IoT Core
  2. 创建设备(如果尚未创建)。
  3. 可选:将设备绑定到网关

    使用 MQTT 网桥时,只有当设备无法生成自己的 JWT 时,您才需要绑定这些设备。

  4. 可选:订阅系统错误主题以获取关于设备操作是否成功的反馈。

  5. 将设备连接到网关

  6. 使用网关代表其设备中继遥测、设备状态和配置消息。试用端到端演示,了解如何操作

网关消息

网关通过 MQTT 网桥连接到 Cloud IoT Core 后,可以发送或接收三类消息:

  • 控制消息:将设备连接到网关,或将设备与网关分离。这些消息在网关和 Cloud IoT Core 之间发送。Cloud IoT Core 仅接受来自网关的控制消息;如果其他类型的设备尝试发送控制消息,Cloud IoT Core 就会关闭连接。
  • 来自网关和设备的消息:可以由网关代表设备进行中继,也可以直接从网关本身发送。
  • 系统错误消息:如果网关代表设备订阅 MQTT 系统错误主题,则设备每次遇到错误时 Cloud IoT Core 都会向网关发送错误消息。

将设备连接到网关

如需让网关能够与 Cloud IoT Core 代理设备通信,请让网关通过 MQTT 网桥发布 QoS 1 /devices/{device_ID_to_attach}/attach 控制消息

如果您通过设备 JWT 将网关配置为对设备进行身份验证,则附加消息的载荷必须包含 JSON 格式的令牌:{ "authorization" : "{JWT_token}" }。否则,Cloud IoT Core 会通过检查设备与网关的关联情况来对设备进行身份验证。

成功响应

设备获得授权后,Cloud IoT Core 会向网关发送 PUBACK 消息以响应连接消息。网关收到 PUBACK 消息后,即可代表设备发布和订阅 Cloud IoT Core 主题,例如遥测消息或配置消息。

如果设备在网关发送连接消息时已经连接,Cloud IoT Core 将返回 PUBACK 消息。

将设备与网关分离

要将设备与网关分离,请让网关通过 MQTT 网桥发布 QoS 1 /devices/{device_ID}/detach 控制消息。设备在发送消息时未连接设备,Cloud IoT Core 会忽略分离的控制消息并发送 PUBACK 消息。

问题排查

如需在设备遇到错误时收到通知,请使用 QoS 级别 0 让网关订阅 MQTT /devices/{gateway_ID}/errors 主题:

Go

设备订阅 errors 主题的步骤如下所示:

import (
	"fmt"
	"io"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {

	const (
		mqttBrokerURL   = "tls://mqtt.googleapis.com:8883"
		protocolVersion = 4 // corresponds to MQTT 3.1.1
	)

	// onConnect defines the on connect handler which resets backoff variables.
	var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
		fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
	}

	// onMessage defines the message handler for the mqtt client.
	var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
		fmt.Fprintf(w, "Message: %s\n", msg.Payload())
	}

	// onDisconnect defines the connection lost handler for the mqtt client.
	var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
		fmt.Println("Client disconnected")
	}

	jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
	clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(mqttBrokerURL)
	opts.SetClientID(clientID)
	opts.SetUsername("unused")
	opts.SetPassword(jwt)
	opts.SetProtocolVersion(protocolVersion)
	opts.SetOnConnectHandler(onConnect)
	opts.SetDefaultPublishHandler(onMessage)
	opts.SetConnectionLostHandler(onDisconnect)

	// Create and connect a client using the above options.
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to connect client")
		return token.Error()
	}

	if err := attachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "AttachDevice error: %v\n", err)
		return err
	}

	// Sleep for 5 seconds to allow attachDevice message to propagate.
	time.Sleep(5 * time.Second)

	// Subscribe to the config topic of the current gateway and a device bound to the gateway.
	gatewayTopic := fmt.Sprintf("/devices/%s/%s", gatewayID, topic)
	if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	deviceTopic := fmt.Sprintf("/devices/%s/%s", deviceID, topic)
	if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	time.Sleep(time.Duration(clientDuration) * time.Second)

	if err := detachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "DetachDevice error: %v\n", err)
		return err
	}

	if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	client.Disconnect(10)
	return nil
}

Java

设备订阅 errors 主题的步骤如下所示:

// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
try (MqttClient client =
    new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence())) {
  // Both connect and publish operations may fail. If they do, allow retries but with an
  // exponential backoff time period.
  long initialConnectIntervalMillis = 500L;
  long maxConnectIntervalMillis = 6000L;
  long maxConnectRetryTimeElapsedMillis = 900000L;
  float intervalMultiplier = 1.5f;

  long retryIntervalMs = initialConnectIntervalMillis;
  long totalRetryTimeMs = 0;

  while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
    try {
      client.connect(connectOptions);
    } catch (MqttException e) {
      int reason = e.getReasonCode();

      // If the connection is lost or if the server cannot be connected, allow retries, but with
      // exponential backoff.
      System.out.println("An error occurred: " + e.getMessage());
      if (reason == MqttException.REASON_CODE_CONNECTION_LOST
          || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
        System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
        Thread.sleep(retryIntervalMs);
        totalRetryTimeMs += retryIntervalMs;
        retryIntervalMs *= intervalMultiplier;
        if (retryIntervalMs > maxConnectIntervalMillis) {
          retryIntervalMs = maxConnectIntervalMillis;
        }
      } else {
        throw e;
      }
    }
  }

  attachCallback(client, gatewayId);

  // The topic gateways receive error updates on. QoS must be 0.
  String errorTopic = String.format("/devices/%s/errors", gatewayId);
  System.out.println(String.format("Listening on %s", errorTopic));

  client.subscribe(errorTopic, 0);

  return client;

Node.js

例如,下面这个设备订阅 config 主题的步骤会突出显示。如需订阅 errors 主题,请指定 /devices/${gateway_ID}/errors
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  qos: 1,
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on('connect', success => {
  if (!success) {
    console.log('Client not connected...');
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, {qos: 0});

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log('Closing connection to MQTT. Goodbye!');
        client.end(true);
      }, clientDuration); // Safely detach device and close connection.
    }, 5000);
  }
});

client.on('close', () => {
  console.log('Connection closed');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  const decodedMessage = Buffer.from(message, 'base64').toString('ascii');

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

Python

设备订阅 errors 主题的步骤如下所示:
global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = f"/devices/{device_id}/config"
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = f"/devices/{gateway_id}/config"
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = f"/devices/{gateway_id}/errors"
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print(f"Refreshing token after {seconds_since_issue}s")
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Cloud IoT Core 会尽可能发送网关错误,并在 QoS 0 期间传送。如果网关未订阅 /devices/{gateway_ID}/errors,则 Cloud IoT Core 会记录失败事件,但不会发送 PUBACK 消息。

MQTT 错误的结构如下:

string error_type;  // A string description of the error type.
string device_id;   // The ID of the device that caused the error.
string description; // A description of the error.

如果错误消息是由 MQTT 消息触发的,则系统也会附加以下信息:

string message_type;  // The string MQTT message type.
string topic;  // The MQTT topic if applicable, otherwise it is empty.
int packet_id;  // The packet ID of the MQTT message if applicable, otherwise it is zero.

错误代码和错误处理

错误代码 说明 推荐执行的操作
GATEWAY_ATTACHMENT_ERROR 网关连接请求失败。 在解决问题之前,请勿重试。
GATEWAY_DEVICE_NOT_FOUND 网关找不到连接的设备来处理收到的消息。 在解决问题之前,请勿重试。
GATEWAY_INVALID_MQTT_TOPIC 网关无法解析指定的 MQTT 主题,要么是格式不正确,要么包含无效的设备 ID 或名称。 在解决问题之前,请勿重试。
GATEWAY_UNEXPECTED_PACKET_ID 网关无法根据数据包 ID 处理消息。例如,PUBACK 可能包含数据包 ID,但没有任何等待响应。 在解决问题之前,请勿重试。
GATEWAY_UNEXPECTED_MESSAGE_TYPE 网关收到了意外消息,例如不受支持的 PUBREL、PUBREC 等。 在解决问题之前,请勿重试。
GATEWAY_DETACHMENT_DEVICE_ERROR 由于设备错误,网关已分离设备。 在解决问题之前,请勿重试。
未知 错误未知。 使用指数退避算法重试。

如需了解详情,请参阅主要的错误消息文档和 MQTT 版本 3.1.1 规范