本页面介绍如何使用 MQTT 网桥与 Cloud IoT Core 通信以及代表绑定的设备发布遥测事件。开始之前,请阅读使用 MQTT 网桥,了解有关将 MQTT 网桥与 Cloud IoT Core 搭配使用的一般信息。
试用端到端演示
通过 MQTT 网桥使用网关
- 创建和配置网关后,请通过 MQTT 网桥将其连接到 Cloud IoT Core。
- 创建设备(如果尚未创建)。
可选:将设备绑定到网关。
使用 MQTT 网桥时,只有当设备无法生成自己的 JWT 时,您才需要绑定这些设备。
可选:订阅系统错误主题以获取关于设备操作是否成功的反馈。
将设备连接到网关。
使用网关代表其设备中继遥测、设备状态和配置消息。试用端到端演示,了解如何操作。
网关消息
网关通过 MQTT 网桥连接到 Cloud IoT Core 后,可以发送或接收三类消息:
- 控制消息:将设备连接到网关,或将设备与网关分离。这些消息在网关和 Cloud IoT Core 之间发送。Cloud IoT Core 仅接受来自网关的控制消息;如果其他类型的设备尝试发送控制消息,Cloud IoT Core 就会关闭连接。
- 来自网关和设备的消息:可以由网关代表设备进行中继,也可以直接从网关本身发送。
- 系统错误消息:如果网关代表设备订阅 MQTT 系统错误主题,则设备每次遇到错误时 Cloud IoT Core 都会向网关发送错误消息。
将设备连接到网关
如需让网关能够与 Cloud IoT Core 代理设备通信,请让网关通过 MQTT 网桥发布 QoS 1 /devices/{device_ID_to_attach}/attach
控制消息。
如果您通过设备 JWT 将网关配置为对设备进行身份验证,则附加消息的载荷必须包含 JSON 格式的令牌:{ "authorization" : "{JWT_token}" }
。否则,Cloud IoT Core 会通过检查设备与网关的关联情况来对设备进行身份验证。
成功响应
设备获得授权后,Cloud IoT Core 会向网关发送 PUBACK 消息以响应连接消息。网关收到 PUBACK 消息后,即可代表设备发布和订阅 Cloud IoT Core 主题,例如遥测消息或配置消息。
如果设备在网关发送连接消息时已经连接,Cloud IoT Core 将返回 PUBACK 消息。
将设备与网关分离
要将设备与网关分离,请让网关通过 MQTT 网桥发布 QoS 1 /devices/{device_ID}/detach
控制消息。设备在发送消息时未连接设备,Cloud IoT Core 会忽略分离的控制消息并发送 PUBACK 消息。
问题排查
如需在设备遇到错误时收到通知,请使用 QoS 级别 0 让网关订阅 MQTT /devices/{gateway_ID}/errors
主题:
Go
设备订阅 errors
主题的步骤如下所示:
import (
"fmt"
"io"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {
const (
mqttBrokerURL = "tls://mqtt.googleapis.com:8883"
protocolVersion = 4 // corresponds to MQTT 3.1.1
)
// onConnect defines the on connect handler which resets backoff variables.
var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
}
// onMessage defines the message handler for the mqtt client.
var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
fmt.Fprintf(w, "Message: %s\n", msg.Payload())
}
// onDisconnect defines the connection lost handler for the mqtt client.
var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Println("Client disconnected")
}
jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)
opts := mqtt.NewClientOptions()
opts.AddBroker(mqttBrokerURL)
opts.SetClientID(clientID)
opts.SetUsername("unused")
opts.SetPassword(jwt)
opts.SetProtocolVersion(protocolVersion)
opts.SetOnConnectHandler(onConnect)
opts.SetDefaultPublishHandler(onMessage)
opts.SetConnectionLostHandler(onDisconnect)
// Create and connect a client using the above options.
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, "Failed to connect client")
return token.Error()
}
if err := attachDevice(deviceID, client, ""); err != nil {
fmt.Fprintf(w, "AttachDevice error: %v\n", err)
return err
}
// Sleep for 5 seconds to allow attachDevice message to propagate.
time.Sleep(5 * time.Second)
// Subscribe to the config topic of the current gateway and a device bound to the gateway.
gatewayTopic := fmt.Sprintf("/devices/%s/%s", gatewayID, topic)
if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, token.Error())
return token.Error()
}
deviceTopic := fmt.Sprintf("/devices/%s/%s", deviceID, topic)
if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, token.Error())
return token.Error()
}
time.Sleep(time.Duration(clientDuration) * time.Second)
if err := detachDevice(deviceID, client, ""); err != nil {
fmt.Fprintf(w, "DetachDevice error: %v\n", err)
return err
}
if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, token.Error())
return token.Error()
}
client.Disconnect(10)
return nil
}
Java
设备订阅 errors
主题的步骤如下所示:
Node.js
例如,下面这个设备订阅 config
主题的步骤会突出显示。如需订阅 errors
主题,请指定 /devices/${gateway_ID}/errors
Python
设备订阅 errors
主题的步骤如下所示:
Cloud IoT Core 会尽可能发送网关错误,并在 QoS 0 期间传送。如果网关未订阅 /devices/{gateway_ID}/errors
,则 Cloud IoT Core 会记录失败事件,但不会发送 PUBACK 消息。
MQTT 错误的结构如下:
string error_type; // A string description of the error type.
string device_id; // The ID of the device that caused the error.
string description; // A description of the error.
如果错误消息是由 MQTT 消息触发的,则系统也会附加以下信息:
string message_type; // The string MQTT message type.
string topic; // The MQTT topic if applicable, otherwise it is empty.
int packet_id; // The packet ID of the MQTT message if applicable, otherwise it is zero.
错误代码和错误处理
错误代码 |
说明 |
推荐执行的操作 |
GATEWAY_ATTACHMENT_ERROR |
网关连接请求失败。 |
在解决问题之前,请勿重试。 |
GATEWAY_DEVICE_NOT_FOUND |
网关找不到连接的设备来处理收到的消息。 |
在解决问题之前,请勿重试。 |
GATEWAY_INVALID_MQTT_TOPIC |
网关无法解析指定的 MQTT 主题,要么是格式不正确,要么包含无效的设备 ID 或名称。 |
在解决问题之前,请勿重试。 |
GATEWAY_UNEXPECTED_PACKET_ID |
网关无法根据数据包 ID 处理消息。例如,PUBACK 可能包含数据包 ID,但没有任何等待响应。 |
在解决问题之前,请勿重试。 |
GATEWAY_UNEXPECTED_MESSAGE_TYPE |
网关收到了意外消息,例如不受支持的 PUBREL、PUBREC 等。 |
在解决问题之前,请勿重试。 |
GATEWAY_DETACHMENT_DEVICE_ERROR |
由于设备错误,网关已分离设备。 |
在解决问题之前,请勿重试。 |
未知 |
错误未知。 |
使用指数退避算法重试。 |
如需了解详情,请参阅主要的错误消息文档和 MQTT 版本 3.1.1 规范。