Google Cloud IoT Core ne sera plus disponible à compter du 16 août 2023. Pour en savoir plus, contactez l'équipe chargée de votre compte Google Cloud.

Utiliser des passerelles avec le pont MQTT

Restez organisé à l'aide des collections Enregistrez et classez les contenus selon vos préférences.

Cette page explique comment les passerelles peuvent utiliser le pont MQTT pour communiquer avec Cloud IoT Core et publier des événements de télémétrie au nom d'appareils liés. Avant de commencer, consultez Utiliser le pont MQTT pour obtenir des informations générales sur l'utilisation du pont MQTT avec Cloud IoT Core.

Démonstration de bout en bout

Utiliser des passerelles avec le pont MQTT

  1. Après avoir créé et configuré la passerelle, connectez-la à Cloud IoT Core via le pont MQTT.
  2. Si vous ne l'avez pas encore fait, créez des appareils.
  3. Facultatif : Associez les appareils à la passerelle.

    Lorsque vous utilisez le pont MQTT, vous ne devez lier les appareils que s'ils ne peuvent pas générer leurs propres JWT.

  4. Facultatif : Abonnez-vous au sujet d'erreurs système pour savoir si les opérations sur l'appareil ont réussi ou non.

  5. Connectez les appareils à la passerelle.

  6. Utiliser la passerelle pour relayer les données de télémétrie, l'état de l'appareil et la configuration au nom de ses appareils. Découvrez la procédure de bout en bout.

Messages des passerelles

Une fois que la passerelle se connecte à Cloud IoT Core via le pont MQTT, elle peut envoyer ou recevoir trois types de messages :

  • Messages de contrôle:permet d'associer ou de dissocier un appareil à la passerelle. Ces messages sont envoyés entre la passerelle et Cloud IoT Core. Cloud IoT Core n'accepte que les messages de contrôle provenant de passerelles. Si un autre type d'appareil tente d'envoyer un message de contrôle, Cloud IoT Core ferme la connexion.
  • Messages provenant des passerelles et des appareils : ils peuvent être transmis par la passerelle pour le compte d'un appareil ou envoyés directement depuis la passerelle elle-même.
  • Messages d'erreur système : lorsque la passerelle est abonnée au sujet d'erreur système MQTT au nom de l'appareil, Cloud IoT Core envoie des messages d'erreur à la passerelle chaque fois que l'appareil rencontre une erreur.

Associer des appareils à une passerelle

Pour permettre à la passerelle d'envoyer par poxy des communications d'appareils avec Cloud IoT Core, demandez à la passerelle de publier un message de contrôle QoS 1 /devices/{device_ID_to_attach}/attach sur le pont MQTT.

Si vous avez configuré la passerelle pour authentifier les appareils à l'aide des jetons JWT des appareils, la charge utile du message d'association doit inclure le jeton au format JSON : { "authorization" : "{JWT_token}" }. Sinon, Cloud IoT Core authentifie l'appareil en vérifiant son association à la passerelle.

Réponse de réussite

Une fois l'appareil autorisé, Cloud IoT Core envoie un message PUBACK à la passerelle en réponse au message d'association. Une fois que la passerelle a reçu le message PUBACK, elle peut publier et s'abonner aux sujets Cloud IoT Core au nom de l'appareil, tels que les messages de télémétrie ou de configuration.

Si un appareil est déjà associé lorsque la passerelle envoie le message en pièce jointe, Cloud IoT Core répond par un message PUBACK.

Dissocier des appareils de la passerelle

Pour dissocier un appareil de la passerelle, demandez à la passerelle de publier un message de contrôle QoS 1 /devices/{device_ID}/detach sur le pont MQTT. Si l'appareil n'est pas associé au moment de l'envoi du message, Cloud IoT Core ignore le message de contrôle de dissociation et envoie un message PUBACK.

Dépannage

Pour recevoir une notification lorsqu'un appareil rencontre une erreur, abonnez la passerelle au sujet MQTT /devices/{gateway_ID}/errors à l'aide du niveau de qualité de service 0 :

C#

L'étape à laquelle l'appareil s'abonne à tous les sujets de la passerelle, y compris le sujet errors, est mise en surbrillance ci-dessous :
public static object ListenForConfigMessages(string projectId, string cloudRegion,
    string registryId, string deviceId, string gatewayId, int numMessages,
    string privateKeyFile, string algorithm, string caCerts, string mqttBridgeHostname,
    int mqttBridgePort, int jwtExpiresMinutes, int duration)
{
    var clientId = $"projects/{projectId}/locations/{cloudRegion}/registries/{registryId}" +
        $"/devices/{gatewayId}";
    var jwtIatTime = SystemClock.Instance.GetCurrentInstant();
    // Create a duration
    Duration durationExp = Duration.FromMinutes(jwtExpiresMinutes);
    var jwtExpTime = SystemClock.Instance.GetCurrentInstant().Plus(durationExp);
    var pass = "";
    if (algorithm == "RS256")
    {
        pass = CloudIotMqttExample.CreateJwtRsa(projectId, privateKeyFile);
    }
    else if (algorithm == "ES256")
    {
        Console.WriteLine("Currently, we do not support this algorithm.");
        return 0;
    }

    // Use gateway to connect server
    var mqttClient = GetClient(
      projectId,
      cloudRegion,
      registryId,
      gatewayId,
      caCerts,
      mqttBridgeHostname,
      mqttBridgePort);

    double initialConnectIntervalMillis = 0.5;
    double maxConnectIntervalMillis = 6;
    double maxConnectRetryTimeElapsedMillis = 900;
    double intervalMultiplier = 1.5;

    double retryIntervalMs = initialConnectIntervalMillis;
    double totalRetryTimeMs = 0;

    // Both connect and publish operations may fail. If they do,
    // allow retries but with an exponential backoff time period.
    while (!mqttClient.IsConnected &&
        totalRetryTimeMs < maxConnectRetryTimeElapsedMillis)
    {
        try
        {
            // Connect to the Google MQTT bridge.
            mqttClient.Connect(clientId, "unused", pass);
        }
        catch (AggregateException aggExceps)
        {
            printExceptions(aggExceps);
            Console.WriteLine("Retrying in " + retryIntervalMs
                + " seconds.");

            System.Threading.Thread.Sleep((int)retryIntervalMs);
            totalRetryTimeMs += retryIntervalMs;
            retryIntervalMs *= intervalMultiplier;
            if (retryIntervalMs > maxConnectIntervalMillis)
            {
                retryIntervalMs = maxConnectIntervalMillis;
            }
        }
    }

    SetupMqttTopics(mqttClient, gatewayId);
    AttachDevice(mqttClient, deviceId, "{}");

    // Wait for about a minute for config messages.
    Console.WriteLine("Listening...");
    for (int i = 0; i < duration; ++i)
    {
        Console.Write(".");
        var secSinceIssue = SystemClock.Instance.GetCurrentInstant().Minus(jwtIatTime);
        if (secSinceIssue.TotalSeconds > (60 * jwtExpiresMinutes))
        {
            Console.WriteLine("Refreshing token after {0}s", secSinceIssue);
            jwtIatTime = SystemClock.Instance.GetCurrentInstant();
            // refresh token and reconnect.
            pass = CloudIotMqttExample.CreateJwtRsa(projectId, privateKeyFile);
            mqttClient = GetClient(
                        projectId,
                        cloudRegion,
                        registryId,
                        gatewayId,
                        caCerts,
                        mqttBridgeHostname,
                        mqttBridgePort);
        }
        System.Threading.Thread.Sleep(1000);
    }

    DetachDevice(mqttClient, deviceId);
    // wait for the device get detached.
    System.Threading.Thread.Sleep(2000);
    mqttClient.Disconnect();
    Console.WriteLine("Finished.");
    return 0;
}

Go

L'étape à laquelle l'appareil s'abonne au sujet errors est mise en surbrillance ci-dessous :

import (
	"fmt"
	"io"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {

	const (
		mqttBrokerURL   = "tls://mqtt.googleapis.com:8883"
		protocolVersion = 4 // corresponds to MQTT 3.1.1
	)

	// onConnect defines the on connect handler which resets backoff variables.
	var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
		fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
	}

	// onMessage defines the message handler for the mqtt client.
	var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
		fmt.Fprintf(w, "Message: %s\n", msg.Payload())
	}

	// onDisconnect defines the connection lost handler for the mqtt client.
	var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
		fmt.Println("Client disconnected")
	}

	jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
	clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(mqttBrokerURL)
	opts.SetClientID(clientID)
	opts.SetUsername("unused")
	opts.SetPassword(jwt)
	opts.SetProtocolVersion(protocolVersion)
	opts.SetOnConnectHandler(onConnect)
	opts.SetDefaultPublishHandler(onMessage)
	opts.SetConnectionLostHandler(onDisconnect)

	// Create and connect a client using the above options.
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to connect client")
		return token.Error()
	}

	if err := attachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "AttachDevice error: %v\n", err)
		return err
	}

	// Sleep for 5 seconds to allow attachDevice message to propagate.
	time.Sleep(5 * time.Second)

	// Subscribe to the config topic of the current gateway and a device bound to the gateway.
	gatewayTopic := fmt.Sprintf("/devices/%s/%s", gatewayID, topic)
	if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	deviceTopic := fmt.Sprintf("/devices/%s/%s", deviceID, topic)
	if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	time.Sleep(time.Duration(clientDuration) * time.Second)

	if err := detachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "DetachDevice error: %v\n", err)
		return err
	}

	if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	client.Disconnect(10)
	return nil
}

Java

L'étape à laquelle l'appareil s'abonne au sujet errors est mise en surbrillance ci-dessous :

// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, gatewayId);

// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));

client.subscribe(errorTopic, 0);

return client;

Node.js

L'étape à laquelle l'appareil s'abonne au sujet config est par exemple mise en évidence ci-dessous. Pour vous abonner au sujet errors, spécifiez /devices/${gateway_ID}/errors
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  qos: 1,
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on('connect', success => {
  if (!success) {
    console.log('Client not connected...');
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, {qos: 0});

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log('Closing connection to MQTT. Goodbye!');
        client.end(true);
      }, clientDuration); // Safely detach device and close connection.
    }, 5000);
  }
});

client.on('close', () => {
  console.log('Connection closed');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  const decodedMessage = Buffer.from(message, 'base64').toString('ascii');

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

Python

L'étape à laquelle l'appareil s'abonne au sujet errors est mise en surbrillance ci-dessous :
global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Dans la mesure du possible, Cloud IoT Core envoie des erreurs de passerelle via QoS 0. Si la passerelle n'est pas abonnée à /devices/{gateway_ID}/errors, Cloud IoT Core consigne les événements d'échec, mais n'envoie pas de message PUBACK.

Les erreurs MQTT ont la structure suivante :

string error_type;  // A string description of the error type.
string device_id;   // The ID of the device that caused the error.
string description; // A description of the error.

Si le message d'erreur a été déclenché par un message MQTT, les informations suivantes seront également jointes :

string message_type;  // The string MQTT message type.
string topic;  // The MQTT topic if applicable, otherwise it is empty.
int packet_id;  // The packet ID of the MQTT message if applicable, otherwise it is zero.

Codes d'erreur et traitement des erreurs

Code d'erreur Description Action recommandée
GATEWAY_ATTACHMENT_ERROR Échec de la requête de rattachement de passerelle. Ne relancez pas la requête avant d'avoir résolu le problème.
GATEWAY_DEVICE_NOT_FOUND La passerelle n'a pas trouvé d'appareil associé pour gérer un message entrant. Ne relancez pas la requête avant d'avoir résolu le problème.
GATEWAY_INVALID_MQTT_TOPIC La passerelle n'a pas pu analyser le sujet MQTT spécifié, qui était mal formaté ou contenait un ID ou un nom d'appareil non valide. Ne relancez pas la requête avant d'avoir résolu le problème.
GATEWAY_UNEXPECTED_PACKET_ID La passerelle n'a pas pu traiter le message en raison de son ID de paquet. Par exemple, un PUBACK peut contenir un ID de paquet, mais rien n'attendait la réponse. Ne relancez pas la requête avant d'avoir résolu le problème.
GATEWAY_UNEXPECTED_MESSAGE_TYPE La passerelle a reçu des messages inattendus, par exemple PUBREL, PUBREC, etc. non compatibles. Ne relancez pas la requête avant d'avoir résolu le problème.
GATEWAY_DETACHMENT_DEVICE_ERROR La passerelle a dissocié un appareil en raison d'une erreur. Ne relancez pas la requête avant d'avoir résolu le problème.
INCONNU L'erreur est inconnue. Relancez la requête avec un intervalle exponentiel entre les tentatives.

Pour en savoir plus, consultez la documentation principale sur les messages d'erreur et la spécification MQTT version 3.1.1.