Google Cloud IoT Core verrà ritirato il 16 agosto 2023. Per saperne di più, contatta il team dedicato al tuo account Google Cloud.

Pubblicazione sul bridge MQTT

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Questa sezione spiega in che modo i dispositivi possono utilizzare il bridge MQTT per comunicare con Cloud IoT Core. Per informazioni generali su HTTP e MQTT, vedi Protocolli.

Per informazioni complete su ciascun metodo descritto in questa sezione, consulta la documentazione dell'API. Vedi anche gli esempi relativi a MQTT.

Per pubblicare tramite il bridge MQTT:

  1. Installa un client MQTT sul tuo dispositivo.

  2. Scarica un certificato server MQTT sul tuo dispositivo.

  3. Configura il client MQTT per autenticare il dispositivo in Cloud IoT Core.

  4. Avvia un handshake TLS su mqtt.googleapis.com o su un dominio di assistenza a lungo termine.

  5. Pubblica eventi di telemetria o imposta lo stato del dispositivo.

Server MQTT

Cloud IoT Core supporta il protocollo MQTT eseguendo un intermediario gestito in ascolto della porta mqtt.googleapis.com:8883. La porta 8883 è la porta TCP standard prenotata con IANA per le connessioni MQTT sicure. Le connessioni a questa porta devono utilizzare il trasporto TLS, supportato dai client open source come Eclipse Paho.

Se la porta 8883 è bloccata dal firewall, puoi anche utilizzare la porta 443: mqtt.googleapis.com:443.

Qualità del servizio (QoS)

La specifica MQTT descrive tre livelli di qualità del servizio (QoS):

  • QoS 0, pubblicato al massimo una volta
  • QoS 1, pubblicato almeno una volta
  • QoS 2, pubblicato esattamente una volta

Cloud IoT Core non supporta QoS 2. La pubblicazione di messaggi QoS 2 chiude la connessione. L'abbonamento a un argomento predefinito con QoS 2 esegue il downgrade del livello di QoS a QoS 1.

Le funzioni QoS 0 e 1 sono le seguenti in Cloud IoT Core:

  • Un messaggio PUBLISH con QoS 1 verrà confermato dal messaggio PUBACK dopo che sarà stato inviato correttamente a Cloud Pub/Sub.
  • I messaggi PUBLISH con QoS 0 non richiedono risposte PUBACK e potrebbero essere eliminati se c'è un jitter lungo il percorso di recapito del messaggio (ad esempio, se Cloud Pub/Sub non è temporaneamente disponibile).
  • Il bridge MQTT di Cloud IoT Core mantiene un piccolo buffer di messaggi non recapitati per riprovare. Se il buffer si riempie, il messaggio con QoS 1 potrebbe essere eliminato e un messaggio PUBACK non verrà inviato al client. Il client è tenuto a inviare nuovamente il messaggio.

Per le configurazioni di dispositivi, i livelli di QoS sono i seguenti:

  • Quando QoS è 0, una determinata versione di configurazione verrà pubblicata sul dispositivo una sola volta. Se il dispositivo non riceve la configurazione, deve riabbonarsi. Un QoS pari a 0 è quindi utile quando una configurazione viene aggiornata di frequente (in secondi o minuti) e non è necessario che il dispositivo riceva ogni aggiornamento.
  • Quando QoS è 1, verrà effettuato l'ultimo aggiornamento della configurazione finché il dispositivo non lo avrà confermato con un PUBACK. Se viene eseguito il push di una configurazione più recente prima che venga confermata quella precedente, quella precedente non verrà pubblicata di nuovo, ma la nuova verrà pubblicata (e ripubblicata). Questo livello è la modalità più sicura per le configurazioni dei dispositivi: garantisce che alla fine il dispositivo otterrà la configurazione più recente.

Download dei certificati server MQTT in corso...

Per utilizzare il trasporto TLS, i dispositivi devono verificare i certificati del server Cloud IoT Core per assicurarsi di comunicare con Cloud IoT Core anziché con un furto d'identità. I seguenti pacchetti di certificati supportano la verifica:

  • Il pacchetto di certificazione CA radice di Google completo (128 kB) per mqtt.googleapis.com.

    • Questo pacchetto definisce la catena di fiducia per comunicare con i prodotti e i servizi Google, tra cui Cloud IoT Core.
    • I dispositivi con il pacchetto di certificazione CA completo radice comunicano direttamente con il server MQTT.
    • Questo pacchetto viene aggiornato regolarmente.
  • Insieme di CA radice minimo di Google (<1 kB) per mqtt.2030.ltsapis.goog. Il set di CA radice minimo include un certificato principale e un certificato di backup.

    • Questo insieme è destinato a dispositivi con vincoli di memoria, come i microcontroller, e stabilisce la catena di affidabilità per comunicare solo con Cloud IoT Core.
    • I dispositivi con il CA radice minimo impostato comunicano con Cloud IoT Core tramite i domini di assistenza a lungo termine.
    • Questo insieme è fisso fino al 2030 (i certificati principale e di backup non cambieranno). Per maggiore sicurezza, Google Trust Services può passare dal certificato principale a quello di backup in qualsiasi momento e senza preavviso.

Dopo aver scaricato i certificati CA radice di Google sul tuo dispositivo, puoi configurare un client MQTT per l'autenticazione del dispositivo, connetterti al server MQTT e comunicare tramite il bridge MQTT.

Configurazione dei client MQTT

I client MQTT autenticano i dispositivi connettendosi al bridge MQTT. Per configurare un client MQTT per autenticare un dispositivo:

  1. Imposta l'ID client MQTT sul percorso completo del dispositivo:
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. Associa il client MQTT ai certificati server MQTT.
  3. Imposta il nome host MQTT su mqtt.googleapis.com o su un dominio di assistenza a lungo termine (se hai utilizzato l'insieme minimo di CA radice).
  4. Specifica un nome utente. Il bridge MQTT ignora il campo del nome utente, ma alcune librerie client MQTT non inviano il campo della password a meno che non venga specificato un campo del nome utente. Per ottenere risultati ottimali, fornisci un nome utente arbitrario come unused o ignored.
  5. Imposta la password. Il campo della password deve contenere il codice JWT.

L'esempio seguente mostra come configurare il client MQTT per autenticare un dispositivo:

C++

Di seguito sono riportati i passaggi per configurare l'ID client e l'autenticazione di un dispositivo:
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java


// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, gatewayId);

// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));

client.subscribe(errorTopic, 0);

return client;

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

Questo esempio utilizza la libreria client delle API di Google per Python.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return "{}: {}".format(rc, mqtt.error_string(rc))

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print("Device client_id is '{}'".format(client_id))

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = "/devices/{}/config".format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = "/devices/{}/commands/#".format(device_id)

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print("Subscribing to {}".format(mqtt_command_topic))
    client.subscribe(mqtt_command_topic, qos=0)

    return client

Utilizzo di un dominio MQTT a lungo termine

I domini di assistenza a lungo termine (LTS) ti consentono di utilizzare una configurazione TLS per un periodo di tempo prolungato. Puoi configurare un client MQTT una volta, configurare il client MQTT in modo che pubblichi messaggi tramite un dominio LTS e quindi comunicare continuamente sul bridge MQTT in modo continuo durante il periodo di tempo supportato.

L'attuale dominio LTS attivo è mqtt.2030.ltsapis.goog. Questo dominio LTS è supportato fino alla fine del 2030.

Per utilizzare il dominio LTS:

  1. Configurare un client MQTT per pubblicare messaggi tramite un dominio LTS.

    1. Configura il client MQTT per autenticare il dispositivo in Cloud IoT Core.
    2. Quando configuri il dispositivo, associa i certificati principali e di backup del set principale di CA radice al client MQTT.
  2. Avvia un handshake TLS su mqtt.2030.ltsapis.goog sulla porta 8883 o 443. Utilizza almeno le seguenti funzionalità TLS.

    • TLS 1.2
    • P-256 con SHA-256 come chiave del certificato e algoritmo hash
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 tramite P-256 e punti non compressi per la suite di crittografia
    • Indicazione del nome del server (SNI)
    • DNS su TCP o UDP

Per ulteriori informazioni sulla protezione del traffico MQTT, inclusi i messaggi inviati a domini LTS, vedi Consigli per la sicurezza dei dispositivi.

Pubblicazione di eventi di telemetria

Una volta che il dispositivo è stato configurato con un client MQTT e viene connesso al bridge MQTT, può pubblicare un evento di telemetria inviando un messaggio PUBLISH a un argomento MQTT nel seguente formato:

/devices/DEVICE_ID/events

L'ID dispositivo è l'ID stringa del dispositivo specificato nell'ID client MQTT. L'ID dispositivo è sensibile alle maiuscole.

I messaggi pubblicati in questo argomento MQTT vengono inoltrati all'argomento di telemetria predefinito del registro corrispondente. L'argomento di telemetria predefinito è l'argomento Cloud Pub/Sub specificato nel campo eventNotificationConfigs[i].pubsubTopicName nella risorsa del Registro di sistema. Se non esiste un argomento Pub/Sub predefinito, i dati di telemetria pubblicati andranno persi. Per pubblicare messaggi in altri argomenti Cloud Pub/Sub, consulta Pubblicazione di eventi di telemetria in altri argomenti Cloud Pub/Sub.

Il campo dei dati del messaggio inoltrato contiene una copia del messaggio pubblicato dal dispositivo e vengono aggiunti i seguenti attributi di messaggio a ciascun messaggio nell'argomento Cloud Pub/Sub:

Attributo Descrizione
deviceId L'identificatore della stringa definito dall'utente per il dispositivo, ad esempio thing1. L'ID dispositivo deve essere univoco all'interno del registro.
deviceNumId L'ID numerico del dispositivo generato dal server. Quando crei un dispositivo, Cloud IoT Core genera automaticamente l'ID numerico del dispositivo; è univoco a livello globale e non modificabile.
deviceRegistryLocation La regione Google Cloud Platform del registro dispositivi, ad esempio us-central1.
deviceRegistryId L'identificatore di stringa definito dall'utente per il registro dispositivi, ad esempio registry1.
projectId L'ID stringa del progetto cloud proprietario del registry e del dispositivo.
subFolder La sottocartella può essere utilizzata come categoria o classificazione di eventi. Per i client MQTT, la sottocartella è l'argomento secondario dopo DEVICE_ID/events, che viene copiato direttamente. Ad esempio, se il client pubblica nell'argomento MQTT /devices/DEVICE_ID/events/alerts, la sottocartella è la stringa alerts.

Nell'esempio seguente viene mostrato come inviare PUBLISH messaggi tramite la connessione MQTT:

C++

Questo esempio utilizza la libreria client delle API di Google per Python.
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = new DateTime();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

Questo esempio utilizza la libreria client delle API di Google per Python.
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = "/devices/{}/{}".format(args.device_id, sub_topic)

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print("Waiting for {} before reconnecting.".format(delay))
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = "{}/{}-payload-{}".format(args.registry_id, args.device_id, i)
    print("Publishing message {}/{}: '{}'".format(i, args.num_messages, payload))
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

Pubblicazione di eventi di telemetria in altri argomenti Cloud Pub/Sub

I dispositivi possono pubblicare dati in altri argomenti Cloud Pub/Sub. Per impostazione predefinita, i messaggi MQTT pubblicati in /devices/DEVICE_ID/events vengono inoltrati all'argomento di telemetria predefinito del registro corrispondente. Nell'argomento MQTT puoi specificare una sottocartella per inoltrare i dati ad altri argomenti Cloud Pub/Sub. La sottocartella è l'argomento secondario dopo /devices/DEVICE_ID/events.

I messaggi pubblicati in una sottocartella vengono inoltrati all'argomento Cloud Pub/Sub con lo stesso nome. Il registro corrispondente deve essere configurato con l'argomento Cloud Pub/Sub; in caso contrario, i messaggi vengono inoltrati all'argomento Cloud Pub/Sub predefinito.

Nei seguenti casi, i messaggi vengono inoltrati all'argomento Cloud Pub/Sub predefinito anziché all'argomento Cloud Pub/Sub aggiuntivo:

  • Nessuna sottocartella specificata nell'argomento MQTT
  • Una sottocartella è specificata nell'argomento MQTT, ma non ha un argomento Pub/Sub corrispondente nel registro dispositivi

Ad esempio, se il dispositivo pubblica nell'argomento MQTT /devices/DEVICE_ID/events/alerts, la sottocartella è la stringa alerts. I messaggi vengono inoltrati all'argomento Cloud Pub/Sub aggiuntivo se i campi eventNotificationConfigs[i].subfolderMatches e eventNotificationConfigs[i].pubsubTopicName sono entrambi impostati su alerts. In caso contrario, i messaggi vengono inoltrati all'argomento Cloud Pub/Sub predefinito.

Impostazione dello stato del dispositivo

I dispositivi connessi possono segnalare lo stato del dispositivo inviando un messaggio PUBLISH al seguente argomento MQTT:

/devices/DEVICE_ID/state

Per classificare e recuperare i messaggi di stato, configura il Registro di sistema con un argomento di stato del dispositivo. L'argomento dello stato del dispositivo è l'argomento Cloud Pub/Sub specificato nel campo StateNotificationConfig.pubsubTopicName. Se il registry è configurato con un argomento dello stato del dispositivo, questi messaggi vengono inoltrati all'argomento Cloud Pub/Sub corrispondente in base al migliore tentativo.

Per maggiori dettagli sul recupero dei messaggi di stato, vedi Recupero dello stato del dispositivo.

Limitazione del traffico MQTT

Cloud IoT Core limita i progetti che generano un carico eccessivo. Quando i dispositivi eseguono nuovamente operazioni non riuscite senza attendere, possono attivare limiti che interessano tutti i dispositivi nello stesso progetto Google Cloud.

Per i nuovi tentativi, ti consigliamo vivamente di implementare un algoritmo di backoff esponenziale troncato con jitter introdotto.

Conserva

Quando invii il messaggio MQTT CONNECT iniziale da un client, puoi fornire un valore facoltativo "keep-alive". Questo valore è un intervallo di tempo, misurato in secondi, durante il quale il broker prevede che un client invii un messaggio, ad esempio un messaggio PUBLISH. Se durante l'intervallo non viene inviato alcun messaggio dal client al broker, la connessione viene chiusa automaticamente. Tieni presente che il valore keep-alive che specifichi viene moltiplicato per 1,5, pertanto l'impostazione di un valore keep-alive di 10 minuti ha come risultato un intervallo di 15 minuti.

Per ulteriori informazioni, consulta la specifica MQTT.

Impostazioni client

Cloud IoT Core non fornisce un proprio valore keep-alive predefinito; se scegli di specificare un intervallo keep-alive, devi impostarlo nel client.

Per ottenere risultati ottimali, imposta un intervallo di keep-alive del client su un minimo di 60 secondi. Molte librerie client open source, incluse le librerie Paho MQTT per C, Python, Node.js e Java, utilizzano 60 secondi per impostazione predefinita.

Limite di tempo di inattività

Separato dall'intervallo keep-alive, Cloud IoT Core ha il proprio limite di tempo di inattività di 20 minuti. In base a questo limite, la connessione client viene terminata automaticamente se il client non invia messaggi per 20 minuti, anche se l'intervallo keep-alive è più lungo. Se non viene fornito un valore keep-alive, viene comunque applicato il timeout per inattività predefinito di 20 minuti.

Risolvere i problemi

Se hai difficoltà a collegarti, vedi Risoluzione dei problemi.