Publier via le pont MQTT

Cette section explique comment les appareils peuvent utiliser le pont MQTT pour communiquer avec Cloud IoT Core. Pour obtenir des informations générales sur HTTP et MQTT, consultez la page Protocoles.

Reportez-vous à la documentation de l'API pour obtenir des informations complètes sur chaque méthode décrite dans cette section. Consultez également les exemples liés à MQTT.

Pour publier sur le pont MQTT, procédez comme suit :

  1. Installez un client MQTT sur votre appareil.

  2. Téléchargez un certificat de serveur MQTT sur votre appareil.

  3. Configurez le client MQTT pour authentifier l'appareil auprès de Cloud IoT Core.

  4. Démarrez un handshake TLS sur mqtt.googleapis.com ou un domaine de support à long terme.

  5. Publiez les événements de télémétrie ou définissez l'état de l'appareil.

Serveur MQTT

Cloud IoT Core accepte le protocole MQTT en exécutant un agent géré qui écoute le port mqtt.googleapis.com:8883. Le port 8883 est le port TCP standard réservé avec l'IANA pour les connexions MQTT sécurisées. Les connexions à ce port doivent utiliser le transfert TLS, qui est compatible avec les clients Open Source tels qu'Eclipse Paho.

Si le port 8883 est bloqué par votre pare-feu, vous pouvez également utiliser le port 443 : mqtt.googleapis.com:443.

Qualité de service (QoS)

La spécification MQTT décrit trois niveaux de qualité de service (QoS) :

  • QoS 0, diffusé une fois au maximum
  • QoS 1, distribué une fois au minimum
  • QoS 2, distribué exactement une fois

Cloud IoT Core n'est pas compatible avec QoS 2. La publication des messages QoS 2 met fin à la connexion. Si vous êtes abonné à un sujet prédéfini avec QoS 2, la qualité de service QoS 1 sera rétrogradée.

QoS 0 et 1 fonctionnent comme suit dans Cloud IoT Core :

  • Un message PUBLISH avec QoS 1 sera confirmé par le message PUBACK une fois qu'il aura été envoyé à Cloud Pub/Sub.
  • Les messages PUBLISH avec QoS 0 ne nécessitent pas de réponses PUBACK et peuvent être supprimés en cas de gigue le long du chemin de distribution des messages (par exemple, si Cloud Pub/Sub est temporairement indisponible).
  • Le pont MQTT Cloud IoT Core conserve un petit tampon de messages non distribués pour les relancer. Si la mémoire tampon est pleine, le message présentant la qualité de service 1 peut être supprimé, et le message PUBACK ne sera pas envoyé au client. Le client doit renvoyer le message.

Pour les configurations d'appareils, les niveaux de qualité de service sont les suivants :

  • Lorsque la qualité de service est définie sur 0, une version de configuration donnée n'est publiée sur l'appareil qu'une seule fois. Si l'appareil ne reçoit pas la configuration, il doit se réabonner. La QoS de 0 est donc utile lorsqu'une configuration est fréquemment mise à jour (de l'ordre de quelques secondes ou minutes) et qu'il n'est pas nécessaire que l'appareil reçoive toutes les mises à jour.
  • Lorsque la qualité de service est égale à 1, la dernière mise à jour de la configuration est relancée jusqu'à ce que l'appareil en accuse réception avec un message PUBACK. Si une configuration plus récente est transmise avant que l'ancienne ne soit confirmée, la nouvelle ne sera pas redistribuée. À la place, la nouvelle sera distribuée (et redistribuée). Ce niveau constitue le mode le plus sûr pour les configurations d'appareil : il garantit que l'appareil obtiendra la dernière configuration.

Télécharger des certificats de serveur MQTT

Pour utiliser le transport TLS, les appareils doivent valider les certificats de serveur Cloud IoT Core afin de s'assurer qu'ils communiquent avec Cloud IoT Core plutôt qu'un emprunt d'identité. Les packages de certificats suivants sont compatibles avec la validation:

  • Package de certification CA racine Google complet (128 Ko) pour mqtt.googleapis.com.

    • Ce package établit une chaîne de confiance pour communiquer avec les produits et services Google, y compris Cloud IoT Core.
    • Les appareils dotés du package de certification CA racine complet communiquent directement avec le serveur MQTT.
    • Ce package est régulièrement mis à jour.
  • Ensemble de CA racine minimal de Google (<1 Ko) pour mqtt.2030.ltsapis.goog. L'ensemble de CA racine minimal inclut un certificat principal et un certificat de sauvegarde.

    • Cet ensemble est destiné aux appareils présentant des contraintes de mémoire, telles que les microcontrôleurs, et établit la chaîne de confiance pour communiquer avec Cloud IoT Core uniquement.
    • Les appareils avec l'ensemble CA racine minimal communiquent avec Cloud IoT Core via des domaines de support à long terme.
    • Ce paramètre sera corrigé jusqu'en 2030 (les certificats principal et de sauvegarde ne seront pas modifiés). Pour plus de sécurité, Google Trust Services peut basculer entre le certificat principal et le certificat de sauvegarde à tout moment sans préavis.

Après avoir téléchargé les certificats racines des autorités de certification Google sur votre appareil, vous pouvez configurer un client MQTT pour authentifier l'appareil, vous connecter au serveur MQTT et communiquer via le pont MQTT.

Configurer des clients MQTT

Les clients MQTT authentifient les appareils en se connectant au pont MQTT. Pour configurer un client MQTT pour authentifier un appareil, procédez comme suit:

  1. Définissez l'ID client MQTT sur le chemin complet de l'appareil :
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. Associez le client MQTT aux certificats du serveur MQTT.
  3. Définissez le nom d'hôte MQTT sur mqtt.googleapis.com ou sur un domaine de support à long terme (si vous avez utilisé l'ensemble CA racine minimal).
  4. Spécifiez un nom d'utilisateur. Le pont MQTT ignore le champ du nom d'utilisateur, mais certaines bibliothèques clientes MQTT n'enverront pas le champ de mot de passe, sauf si le champ de nom d'utilisateur est spécifié. Pour de meilleurs résultats, indiquez un nom d'utilisateur arbitraire tel que unused ou ignored.
  5. Définissez le mot de passe. Le champ de mot de passe doit contenir le jeton JWT.

L'exemple suivant montre comment configurer le client MQTT pour authentifier un appareil :

C++

Les étapes de configuration de l'ID client et d'authentification d'un appareil sont indiquées ci-dessous :
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

MqttExampleOptions options = MqttExampleOptions.fromFlags(args);
if (options == null) {
  // Could not parse.
  System.exit(1);
}

if ("listen-for-config-messages".equals(options.command)) {
  System.out.println(
      String.format("Listening for configuration messages for %s:", options.deviceId));
  listenForConfigMessages(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId);
} else if ("send-data-from-bound-device".equals(options.command)) {
  System.out.println("Sending data on behalf of device:");
  sendDataFromBoundDevice(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId,
      options.messageType,
      options.telemetryData);
} else {
  System.out.println("Starting mqtt demo:");
  mqttDeviceDemo(options);
}

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

Cet exemple utilise la bibliothèque cliente des API Google pour Python.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return "{}: {}".format(rc, mqtt.error_string(rc))

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print("Device client_id is '{}'".format(client_id))

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = "/devices/{}/config".format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = "/devices/{}/commands/#".format(device_id)

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print("Subscribing to {}".format(mqtt_command_topic))
    client.subscribe(mqtt_command_topic, qos=0)

    return client

Utiliser un domaine MQTT à long terme

Les domaines compatibles LTS (Long Term Support) vous permettent d'utiliser une seule configuration TLS pendant une longue période. Vous pouvez configurer un client MQTT une fois, configurer celui-ci pour publier des messages via un domaine LTS, puis communiquer en continu via le pont MQTT au cours de la période couverte.

Le domaine LTS actif actuel est mqtt.2030.ltsapis.goog. Ce domaine LTS sera disponible jusqu'en 2030.

Pour utiliser le domaine LTS :

  1. Configurer un client MQTT pour publier des messages via un domaine LTS

    1. Configurez le client MQTT pour authentifier l'appareil auprès de Cloud IoT Core.
    2. Lors de la configuration de l'appareil, associez les certificats principal et de sauvegarde de l'ensemble CA racine minimal au client MQTT.
  2. Lancez un handshake TLS sur mqtt.2030.ltsapis.goog sur le port 8883 ou 443. Utilisez au minimum les fonctionnalités TLS suivantes.

    • TLS 1.2
    • P-256 avec SHA-256 comme clé de certificat et algorithme de hachage
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 avec P-256 et points non compressés pour la suite de chiffrement
    • indication du nom du serveur (SNI)
    • DNS sur TCP ou UDP

Pour en savoir plus sur la sécurisation du trafic MQTT, y compris sur les messages envoyés à des domaines LTS, consultez Recommandations concernant la sécurité des appareils.

Publier des événements de télémétrie

Une fois l'appareil configuré avec un client MQTT et connecté au pont MQTT, il peut publier un événement de télémétrie en envoyant un message PUBLISH à un sujet MQTT au format suivant :

/devices/DEVICE_ID/events

L'ID d'appareil correspond à l'ID de chaîne de l'appareil spécifié dans l'ID client MQTT. L'ID de l'appareil est sensible à la casse.

Les messages publiés dans ce sujet MQTT sont transférés vers le sujet de télémétrie par défaut du registre correspondant. Le sujet de télémétrie par défaut est le sujet Cloud Pub/Sub spécifié dans le champ eventNotificationConfigs[i].pubsubTopicName de la ressource de registre. Si aucun sujet Pub/Sub par défaut n'existe, les données de télémétrie publiées seront perdues. Pour publier des messages dans d'autres sujets Cloud Pub/Sub, consultez la page Publier des événements de télémétrie sur des sujets Cloud Pub/Sub supplémentaires.

Le champ de données du message transféré contient une copie du message publié par l'appareil, et les attributs de message suivants sont ajoutés à chaque message du sujet Cloud Pub/Sub :

Attribut Description
deviceId Identifiant de chaîne définie par l'utilisateur pour l'appareil, par exemple, thing1. L'ID de l'appareil doit être unique dans le registre.
deviceNumId ID numérique généré par le serveur de l'appareil. Lorsque vous créez un appareil, Cloud IoT Core génère automatiquement l'ID numérique de l'appareil, qui est unique et non modifiable.
deviceRegistryLocation Région Google Cloud Platform du registre d'appareils, par exemple us-central1.
deviceRegistryId Identifiant de chaîne défini par l'utilisateur pour le registre d'appareils ; par exemple, registry1.
projectId ID de chaîne du projet sur le cloud qui contient le registre et l'appareil.
subFolder Le sous-dossier peut être utilisé comme catégorie d'événements ou classification. Pour les clients MQTT, le sous-dossier est le sous-thème suivant DEVICE_ID/events, qui est copié directement. Par exemple, si le client publie dans le sujet MQTT /devices/DEVICE_ID/events/alerts, le sous-dossier est la chaîne alerts.

L'exemple suivant montre comment envoyer des messages PUBLISH via la connexion MQTT :

C++

Cet exemple utilise la bibliothèque cliente des API Google pour Python.
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = new DateTime();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

Cet exemple utilise la bibliothèque cliente des API Google pour Python.
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = "/devices/{}/{}".format(args.device_id, sub_topic)

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print("Waiting for {} before reconnecting.".format(delay))
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = "{}/{}-payload-{}".format(args.registry_id, args.device_id, i)
    print("Publishing message {}/{}: '{}'".format(i, args.num_messages, payload))
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

Publier des événements de télémétrie sur des sujets Cloud Pub/Sub supplémentaires

Les appareils peuvent publier des données dans d'autres sujets Cloud Pub/Sub. Par défaut, les messages MQTT publiés sur /devices/DEVICE_ID/events sont transférés vers le sujet de télémétrie par défaut du registre correspondant. Vous pouvez spécifier un sous-dossier dans le sujet MQTT pour transférer les données vers d'autres sujets Cloud Pub/Sub. Le sous-dossier est le thème secondaire après /devices/DEVICE_ID/events.

Les messages publiés dans un sous-dossier sont transférés vers le sujet Cloud Pub/Sub portant le même nom. Le registre correspondant doit être configuré avec le sujet Cloud Pub/Sub. Sinon, les messages sont transférés vers le sujet Cloud Pub/Sub par défaut.

Les messages sont transférés vers le sujet Cloud Pub/Sub par défaut à la place du sujet Cloud Pub/Sub supplémentaire dans les cas suivants :

  • Aucun sous-dossier n'est spécifié dans le sujet MQTT.
  • Un sous-dossier est spécifié dans le sujet MQTT, mais ne possède pas de sujet Pub/Sub correspondant dans le registre d'appareils.

Par exemple, si l'appareil publie dans le sujet MQTT /devices/DEVICE_ID/events/alerts, le sous-dossier correspond à la chaîne alerts. Les messages sont transférés vers le sujet Cloud Pub/Sub supplémentaire si les champs eventNotificationConfigs[i].subfolderMatches et eventNotificationConfigs[i].pubsubTopicName sont tous deux définis sur alerts Sinon, les messages sont transférés vers le sujet Cloud Pub/Sub par défaut.

Définir l'état de l'appareil

Les appareils connectés peuvent signaler l'état des appareils en envoyant un message PUBLISH au sujet MQTT suivant :

/devices/DEVICE_ID/state

Pour classer et récupérer les messages d'état, configurez le registre avec un sujet d'état d'appareil. Le sujet d'état d'appareil est le sujet Cloud Pub/Sub spécifié dans le champ StateNotificationConfig.pubsubTopicName. Si le registre est configuré avec un sujet d'état d'appareil, ces messages sont transférés au sujet Cloud Pub/Sub correspondant de la manière la plus optimale possible.

Pour en savoir plus sur la récupération des messages d'état, consultez la section Obtenir l'état de l'appareil.

Limiter le trafic MQTT

Cloud IoT Core limite les projets qui génèrent une charge excessive. Lorsque les appareils relancent des opérations ayant échoué sans attendre, ils peuvent déclencher des limites qui affectent tous les appareils du même projet Google Cloud.

Pour les nouvelles tentatives, nous vous encourageons vivement à mettre en œuvre un algorithme d'intervalle exponentiel entre les tentatives tronqué avec une gigue introduite.

Message keep-alive

Lorsque vous envoyez le message CONNECT MQTT initial d'un client, vous pouvez fournir une valeur "keep-alive" facultative. Cette valeur est un intervalle de temps, exprimé en secondes, pendant lequel l'agent s'attend à ce qu'un client envoie un message, tel qu'un message PUBLISH. Si aucun message n'est envoyé du client à l'agent pendant l'intervalle, celui-ci ferme automatiquement la connexion. Notez que la valeur keep-alive que vous spécifiez est multipliée par 1,5.Par conséquent, la définition d'un message keep-alive de 10 minutes génère un intervalle de 15 minutes.

Pour en savoir plus, consultez la page Spécification MQTT.

Paramètres client

Cloud IoT Core ne fournit pas sa propre valeur keep-alive par défaut ; Si vous choisissez de spécifier un intervalle keep-alive, vous devez le définir dans le client.

Pour de meilleurs résultats, définissez l'intervalle keep-alive du client sur un minimum de 60 secondes. De nombreuses bibliothèques clientes Open Source, y compris les bibliothèques MQTT de Paho pour C, Python, Node.js et Java, avec 60 secondes par défaut.

Durée d'inactivité

En plus de l'intervalle keep-alive, Cloud IoT Core possède sa propre durée d'inactivité de 20 minutes. En fonction de cette limite, une connexion client est automatiquement interrompue si le client n'envoie aucun message pendant 20 minutes, même si l'intervalle keep-alive est plus long. Si aucune valeur keep-alive n'est fournie, le délai d'inactivité par défaut de 20 minutes prend toujours effet.

Dépannage

Si vous rencontrez des difficultés pour vous connecter, consultez Dépannage.