Google Cloud IoT Core ne sera plus disponible à compter du 16 août 2023. Pour en savoir plus, contactez l'équipe chargée de votre compte Google Cloud.

Publier via le pont MQTT

Restez organisé à l'aide des collections Enregistrez et classez les contenus selon vos préférences.

Cette section explique comment les appareils peuvent utiliser le pont MQTT pour communiquer avec Cloud IoT Core. Pour obtenir des informations générales sur HTTP et MQTT, consultez la section Protocoles.

Pour en savoir plus sur chaque méthode décrite dans cette section, consultez la documentation de l'API. Consultez également les exemples associés à MQTT.

Pour publier sur le pont MQTT, procédez comme suit :

  1. Installez un client MQTT sur votre appareil.

  2. Téléchargez un certificat de serveur MQTT sur votre appareil.

  3. Configurez le client MQTT pour authentifier l'appareil auprès de Cloud IoT Core.

  4. Démarrez un handshake TLS sur mqtt.googleapis.com ou un domaine de support à long terme.

  5. Publiez des événements de télémétrie ou définissez l'état de l'appareil.

Serveur MQTT

Cloud IoT Core est compatible avec le protocole MQTT en exécutant un courtier géré qui écoute le port mqtt.googleapis.com:8883. Le port 8883 est le port TCP standard réservé à l'IANA pour les connexions MQTT sécurisées. Les connexions à ce port doivent utiliser le transfert TLS, qui est compatible avec les clients Open Source comme Eclipse Paho.

Si le port 8883 est bloqué par votre pare-feu, vous pouvez également utiliser le port 443 : mqtt.googleapis.com:443.

Qualité de service (QoS)

La spécification MQTT décrit trois niveaux de qualité de service (QoS) :

  • QoS 0, une fois au maximum
  • QoS 1, distribué une fois au minimum
  • QoS 2, distribué exactement une fois

Cloud IoT Core n'est pas compatible avec QoS 2. La publication des messages QoS 2 ferme la connexion. Si vous vous abonnez à un sujet prédéfini avec QoS 2, le niveau de qualité diminue.

QoS 0 et 1 fonctionnent comme suit dans Cloud IoT Core :

  • Un message PUBLISH avec QoS 1 sera confirmé par le message PUBACK une fois qu'il aura été envoyé à Cloud Pub/Sub.
  • Les messages PUBLISH avec QoS 0 ne nécessitent pas de réponse PUBACK et peuvent être supprimés en cas de gigue le long du chemin de distribution des messages (par exemple, si Cloud Pub/Sub est temporairement indisponible).
  • Le pont MQTT Cloud IoT Core conserve un petit tampon de messages non distribués afin de les relancer. Si la mémoire tampon est saturée, le message avec QoS 1 peut être supprimé et aucun message PUBACK ne sera envoyé au client. Le client doit renvoyer le message.

Pour les configurations d'appareils, les niveaux de qualité de service sont les suivants :

  • Lorsque la qualité de service est égale à 0, une version de configuration donnée n'est publiée sur l'appareil qu'une seule fois. Si l'appareil ne reçoit pas la configuration, il doit se réabonner. Un niveau de qualité de 0 est donc utile lorsqu'une configuration est fréquemment mise à jour (par ordre de secondes ou de minutes) et que l'appareil n'a pas besoin de recevoir chaque mise à jour.
  • Lorsque la qualité de service est égale à 1, la dernière mise à jour de la configuration est relancée jusqu'à ce que l'appareil en accuse réception avec un message PUBACK. Si une configuration plus récente est transmise avant que l'ancienne ne soit confirmée, la nouvelle ne sera pas redistribuée. À la place, la nouvelle sera distribuée (et redistribuée). Ce niveau constitue le mode le plus sûr pour les configurations d'appareil : il garantit que l'appareil obtiendra la dernière configuration.

Télécharger des certificats de serveur MQTT

Pour utiliser le transport TLS, les appareils doivent vérifier les certificats de serveur Cloud IoT Core pour s'assurer qu'ils communiquent avec Cloud IoT Core plutôt qu'un emprunt d'identité. Les packages de certificats suivants sont compatibles avec la validation:

  • Package de certification CA racine Google complet (128 Ko) pour mqtt.googleapis.com.

    • Ce package établit la chaîne de confiance permettant de communiquer avec les produits et services Google, y compris Cloud IoT Core.
    • Les appareils dotés du package de certification CA racine complet communiquent directement avec le serveur MQTT.
    • Ce package est régulièrement mis à jour.
  • Ensemble de CA racine minimal de Google (<1 Ko) pour mqtt.2030.ltsapis.goog. L'ensemble de CA racine minimal inclut un certificat principal et un certificat de sauvegarde.

    • Cet ensemble est destiné aux appareils ayant des contraintes de mémoire, comme les microcontrôleurs. Il établit la chaîne de confiance pour communiquer uniquement avec Cloud IoT Core.
    • Les appareils avec l'ensemble CA racine minimal communiquent avec Cloud IoT Core via des domaines de support à long terme.
    • Cet ensemble est corrigé jusqu'en 2030 (les certificats principaux et de secours ne seront pas modifiés). Pour plus de sécurité, Google Trust Services peut basculer entre le certificat principal et le certificat de secours à tout moment et sans préavis.

Après avoir téléchargé les certificats CA racine de Google sur votre appareil, vous pouvez configurer un client MQTT pour authentifier l'appareil, vous connecter au serveur MQTT et communiquer via le pont MQTT.

Configurer des clients MQTT

Les clients MQTT authentifient les appareils en se connectant au pont MQTT. Pour configurer un client MQTT afin d'authentifier un appareil:

  1. Définissez l'ID client MQTT sur le chemin complet de l'appareil :
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. Associez le client MQTT à des certificats de serveur MQTT.
  3. Définissez le nom d'hôte MQTT sur mqtt.googleapis.com ou sur un domaine de support à long terme (si vous avez utilisé l'ensemble CA racine minimal).
  4. Spécifiez un nom d'utilisateur. Le pont MQTT ignore le champ du nom d'utilisateur, mais certaines bibliothèques clientes MQTT n'enverront pas le champ du mot de passe, sauf si celui-ci est spécifié. Pour des résultats optimaux, indiquez un nom d'utilisateur arbitraire tel que unused ou ignored.
  5. Définissez le mot de passe. Le champ du mot de passe doit contenir le JWT.

L'exemple suivant montre comment configurer le client MQTT pour authentifier un appareil :

C++

Les étapes de configuration de l'ID client et d'authentification d'un appareil sont indiquées ci-dessous :
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java


// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, gatewayId);

// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));

client.subscribe(errorTopic, 0);

return client;

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

Cet exemple utilise la bibliothèque cliente des API Google pour Python.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return "{}: {}".format(rc, mqtt.error_string(rc))

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print("Device client_id is '{}'".format(client_id))

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = "/devices/{}/config".format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = "/devices/{}/commands/#".format(device_id)

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print("Subscribing to {}".format(mqtt_command_topic))
    client.subscribe(mqtt_command_topic, qos=0)

    return client

Utiliser un domaine MQTT à long terme

Les domaines utilisant le support à long terme (LTS) vous permettent d'utiliser une configuration TLS pendant une période prolongée. Vous pouvez configurer un client MQTT une fois, le configurer pour publier des messages via un domaine LTS, puis communiquer via le pont MQTT pendant la période prise en charge.

Le domaine LTS actuel est actif : mqtt.2030.ltsapis.goog. Ce domaine LTS sera pris en charge jusqu'en 2030.

Pour utiliser le domaine LTS :

  1. Configurer un client MQTT pour publier des messages via un domaine LTS

    1. Configurez le client MQTT pour authentifier l'appareil auprès de Cloud IoT Core.
    2. Lors de la configuration de l'appareil, associez les certificats principal et de sauvegarde de l'ensemble CA racine minimal au client MQTT.
  2. Instaurez un handshake TLS sur mqtt.2030.ltsapis.goog sur le port 8883 ou 443. Utilisez au moins les fonctionnalités TLS suivantes.

    • TLS 1.2
    • P-256 avec SHA-256 comme clé de certificat et algorithme de hachage
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 utilisant P-256 et les points non compressés pour la suite de chiffrement
    • Indication du nom du serveur (SNI)
    • DNS sur TCP ou UDP

Pour en savoir plus sur la sécurisation du trafic MQTT, y compris sur les messages envoyés aux domaines LTS, consultez Recommandations de sécurité de l'appareil.

Publier des événements de télémétrie

Une fois l'appareil configuré avec un client MQTT et connecté au pont MQTT, il peut publier un événement de télémétrie en envoyant un message PUBLISH à un sujet MQTT au format suivant :

/devices/DEVICE_ID/events

L'ID d'appareil correspond à l'ID de chaîne de l'appareil spécifié dans l'ID client MQTT. L'ID de l'appareil est sensible à la casse.

Les messages publiés dans ce sujet MQTT sont transférés vers le sujet de télémétrie par défaut du registre correspondant. Le sujet de télémétrie par défaut est le sujet Cloud Pub/Sub spécifié dans le champ eventNotificationConfigs[i].pubsubTopicName de la ressource de registre. Si aucun sujet Pub/Sub par défaut n'existe, les données de télémétrie publiées seront perdues. Pour publier des messages dans d'autres sujets Cloud Pub/Sub, consultez Publier des événements de télémétrie dans d'autres sujets Cloud Pub/Sub.

Le champ de données du message transféré contient une copie du message publié par l'appareil, et les attributs de message suivants sont ajoutés à chaque message du sujet Cloud Pub/Sub :

Attribut Description
deviceId Identifiant de chaîne défini par l'utilisateur pour l'appareil, par exemple thing1. L'ID d'appareil doit être unique dans le registre.
deviceNumId ID numérique généré par le serveur de l'appareil. Lorsque vous créez un appareil, Cloud IoT Core génère automatiquement l'ID numérique de l'appareil, qui est unique et non modifiable.
deviceRegistryLocation Région Google Cloud Platform du registre d'appareils, par exemple us-central1.
deviceRegistryId Identifiant de chaîne défini par l'utilisateur pour le registre d'appareils ; par exemple, registry1.
projectId ID de chaîne du projet cloud propriétaire du registre et de l'appareil.
subFolder Vous pouvez utiliser le sous-dossier en tant que catégorie ou catégorie d'événement. Pour les clients MQTT, le sous-dossier est le sous-thème après DEVICE_ID/events, qui est copié directement. Par exemple, si le client publie sur le sujet MQTT /devices/DEVICE_ID/events/alerts, le sous-dossier est la chaîne alerts.

L'exemple suivant montre comment envoyer des messages PUBLISH via la connexion MQTT :

C++

Cet exemple utilise la bibliothèque cliente des API Google pour Python.
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = new DateTime();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

Cet exemple utilise la bibliothèque cliente des API Google pour Python.
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = "/devices/{}/{}".format(args.device_id, sub_topic)

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print("Waiting for {} before reconnecting.".format(delay))
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = "{}/{}-payload-{}".format(args.registry_id, args.device_id, i)
    print("Publishing message {}/{}: '{}'".format(i, args.num_messages, payload))
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

Publier des événements de télémétrie dans d'autres sujets Cloud Pub/Sub

Les appareils peuvent publier des données dans d'autres sujets Cloud Pub/Sub. Par défaut, les messages MQTT publiés sur /devices/DEVICE_ID/events sont transférés vers le sujet de télémétrie par défaut du registre correspondant. Vous pouvez spécifier un sous-dossier dans le sujet MQTT pour transférer des données vers d'autres sujets Cloud Pub/Sub. Le sous-dossier est le sous-thème après /devices/DEVICE_ID/events.

Les messages publiés dans un sous-dossier sont transférés vers le sujet Cloud Pub/Sub portant le même nom. Le registre correspondant doit être configuré avec le sujet Cloud Pub/Sub. Sinon, les messages sont transférés vers le sujet Cloud Pub/Sub par défaut.

Les messages sont transférés vers le sujet Cloud Pub/Sub par défaut à la place du sujet Cloud Pub/Sub supplémentaire dans les cas suivants :

  • Aucun sous-dossier n'est spécifié dans le sujet MQTT.
  • Un sous-dossier est spécifié dans le sujet MQTT, mais il n'existe pas de sujet Pub/Sub correspondant dans le registre d'appareils

Par exemple, si l'appareil publie sur le sujet MQTT /devices/DEVICE_ID/events/alerts, le sous-dossier est la chaîne alerts. Les messages sont transférés vers le sujet Cloud Pub/Sub supplémentaire si les champs eventNotificationConfigs[i].subfolderMatches et eventNotificationConfigs[i].pubsubTopicName sont tous deux définis sur alerts. Sinon, les messages sont transférés au sujet Cloud Pub/Sub par défaut.

Définir l'état de l'appareil

Les appareils connectés peuvent signaler l'état des appareils en envoyant un message PUBLISH au sujet MQTT suivant :

/devices/DEVICE_ID/state

Pour classer et récupérer des messages d'état, configurez le registre avec un sujet indiquant l'état de l'appareil. Le sujet de l'état de l'appareil est le sujet Cloud Pub/Sub spécifié dans le champ StateNotificationConfig.pubsubTopicName. Si le registre est configuré avec un sujet sur l'état de l'appareil, ces messages sont transférés au sujet Cloud Pub/Sub correspondant de la façon la plus optimale possible.

Pour en savoir plus sur la récupération des messages d'état, consultez Obtenir l'état de l'appareil.

Limiter le trafic MQTT

Cloud IoT Core limite les projets qui génèrent une charge excessive. Lorsque les appareils tentent de relancer les opérations ayant échoué sans attendre, ils peuvent déclencher des limites qui affectent tous les appareils d'un même projet Google Cloud.

Pour les nouvelles tentatives, il est fortement recommandé de mettre en œuvre un algorithme d'intervalle exponentiel entre les tentatives tronqué avec la gigue introduite.

Message keep-alive

Lorsque vous envoyez le message MQTT CONNECT initial à partir d'un client, vous pouvez fournir une valeur "keep-alive" facultative. Cette valeur est un intervalle de temps, mesuré en secondes, au cours duquel l'agent s'attend à ce qu'un client envoie un message, tel qu'un message PUBLISH. Si aucun message n'est envoyé par le client à l'agent pendant l'intervalle, celui-ci ferme automatiquement la connexion. Notez que la valeur keep-alive que vous spécifiez est multipliée par 1,5.Par conséquent, définir un message keep-alive de 10 minutes équivaut à un intervalle de 15 minutes.

Pour en savoir plus, consultez la spécification MQTT.

Paramètres client

Cloud IoT Core ne fournit pas sa propre valeur keep-alive par défaut. Si vous choisissez de spécifier un intervalle keep-alive, vous devez le définir dans le client.

Pour de meilleurs résultats, définissez l'intervalle keep-alive du client sur une durée minimale de 60 secondes. De nombreuses bibliothèques clientes Open Source, y compris les bibliothèques Paho MQTT pour C, Python, Node.js et Java, utilisent par défaut 60 secondes.

Délai d'inactivité

En dehors de l'intervalle keep-alive, Cloud IoT Core a sa propre limite de temps d'inactivité de 20 minutes. Sur la base de cette limite, une connexion client est automatiquement interrompue si le client n'envoie aucun message pendant 20 minutes, même si l'intervalle keep-alive est plus long. Si aucune valeur keep-alive n'est fournie, le délai d'inactivité par défaut de 20 minutes est pris en compte.

Dépannage

Si vous rencontrez des problèmes de connexion, consultez la section Dépannage.