Google Cloud IoT Core verrà ritirato il 16 agosto 2023. Per saperne di più, contatta il team dedicato al tuo account Google Cloud.

Pubblicazione sul bridge MQTT

Questa sezione spiega in che modo i dispositivi possono utilizzare il bridge MQTT per comunicare con Cloud IoT Core. Per informazioni generali su HTTP e MQTT, vedi Protocolli.

Per informazioni complete su ciascun metodo descritto in questa sezione, consulta la documentazione dell'API. Vedi anche gli esempi relativi a MQTT.

Per pubblicare tramite il bridge MQTT:

  1. Installa un client MQTT sul tuo dispositivo.

  2. Scarica un certificato server MQTT sul dispositivo.

  3. Configura il client MQTT per autenticare il dispositivo in Cloud IoT Core.

  4. Avvia un handshake TLS su mqtt.googleapis.com o su un dominio di assistenza a lungo termine.

  5. Pubblica eventi di telemetria o imposta lo stato del dispositivo.

Server MQTT

Cloud IoT Core supporta il protocollo MQTT eseguendo un intermediario gestito che rimane in ascolto sulla porta mqtt.googleapis.com:8883. La porta 8883 è la porta TCP standard prenotata con IANA per le connessioni MQTT sicure. Le connessioni a questa porta devono utilizzare il trasferimento TLS, che è supportato dai client open source come Eclipse Paho.

Se la porta 8883 è bloccata dal firewall, puoi anche utilizzare la porta 443: mqtt.googleapis.com:443.

Qualità del servizio (QoS)

La specifica MQTT descrive tre livelli di qualità del servizio (QoS):

  • QoS 0, pubblicazione al massimo una volta
  • QoS 1, pubblicato almeno una volta
  • QoS 2, pubblicato esattamente una volta

Cloud IoT Core non supporta QoS 2. La pubblicazione di messaggi QoS 2 chiude la connessione. L'abbonamento a un argomento predefinito con QoS 2 esegue il downgrade del livello QoS a QoS 1.

Le funzioni QoS 0 e 1 funzionano come segue in Cloud IoT Core:

  • Un messaggio PUBLISH con QoS 1 verrà confermato dal messaggio PUBACK dopo che è stato inviato correttamente a Cloud Pub/Sub.
  • PUBLISH messaggi con QoS 0 non richiedono risposte PUBACK e potrebbero essere eliminati se c'è un jitter lungo il percorso di recapito del messaggio (ad esempio, se Cloud Pub/Sub è temporaneamente non disponibile).
  • Il bridge MQTT di Cloud IoT Core mantiene un piccolo buffer di messaggi non recapitati per poterli riprovare. Se il buffer si esaurisce, il messaggio con QoS 1 potrebbe essere ignorato e un messaggio PUBACK non verrà inviato al client. Il client è tenuto a inviare nuovamente il messaggio.

Per le configurazioni di dispositivi, i livelli di QoS sono i seguenti:

  • Quando QoS è 0, una determinata versione di configurazione verrà pubblicata sul dispositivo solo una volta. Se il dispositivo non riceve la configurazione, deve iscriversi nuovamente. Una QoS pari a 0 è quindi utile quando una configurazione viene aggiornata di frequente (nell'ordine di secondi o minuti) e non è necessaria che il dispositivo riceva ogni aggiornamento.
  • Se il valore QoS è 1, verrà eseguito nuovamente l'ultimo aggiornamento della configurazione finché il dispositivo non lo confermerà con un PUBACK. Se viene eseguito il push di una configurazione più recente prima che quella precedente venga confermata, la precedente non verrà caricata nuovamente, ma la nuova verrà pubblicata (e ripubblicata). Questo livello è la modalità più sicura per le configurazioni dei dispositivi: garantisce che alla fine il dispositivo riceverà la configurazione più recente.

Download dei certificati server MQTT in corso...

Per utilizzare il trasporto TLS, i dispositivi devono verificare i certificati server di Cloud IoT Core per assicurarsi di comunicare con Cloud IoT Core anziché un furto d'identità. I seguenti pacchetti di certificati supportano la verifica:

  • Il pacchetto di certificazione CA radice di Google completo (128 kB) per mqtt.googleapis.com.

    • Questo pacchetto stabilisce la catena di fiducia per la comunicazione con i prodotti e i servizi Google, incluso Cloud IoT Core.
    • I dispositivi con il pacchetto di certificazione CA completo possono comunicare direttamente con il server MQTT.
    • Questo pacchetto viene aggiornato regolarmente.
  • Set minimo di CA radice di Google (< 1 KB) per mqtt.2030.ltsapis.goog. Il set di CA radice minimo include un certificato principale e un backup.

    • Questo set è per i dispositivi con vincoli di memoria, come i microcontroller, e stabilisce la catena di attendibilità per comunicare solo con Cloud IoT Core.
    • I dispositivi con un insieme di CA radice minimo comunicano con Cloud IoT Core tramite domini di assistenza a lungo termine.
    • Questo insieme viene corretto fino al 2030 (i certificati principale e di backup non cambieranno). Per maggiore sicurezza, i Servizi Google Trust possono passare dal certificato principale a quello di backup in qualsiasi momento e senza preavviso.

Dopo aver scaricato i certificati CA radice di Google sul tuo dispositivo, puoi configurare un client MQTT per l'autenticazione del dispositivo, connetterti al server MQTT e comunicare tramite il bridge MQTT.

Configurazione dei client MQTT

I client MQTT autenticano i dispositivi connettendosi al bridge MQTT. Per configurare un client MQTT per autenticare un dispositivo:

  1. Imposta l'ID client MQTT sul percorso completo del dispositivo:
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. Associare il client MQTT ai certificati server MQTT.
  3. Imposta il nome host MQTT su mqtt.googleapis.com o su un dominio di assistenza a lungo termine (se hai utilizzato il set di CA radice minimo).
  4. Specifica un nome utente. Il bridge MQTT ignora il campo del nome utente, ma alcune librerie client MQTT non inviano il campo della password a meno che non venga specificato il campo del nome utente. Per ottenere risultati ottimali, fornisci un nome utente arbitrario come unused o ignored.
  5. Imposta la password. Il campo della password deve contenere il codice JWT.

L'esempio seguente mostra come configurare il client MQTT per autenticare un dispositivo:

C++

Di seguito sono riportati i passaggi per configurare l'ID client e autenticare un dispositivo:
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java


// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
try (MqttClient client =
    new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence())) {
  // Both connect and publish operations may fail. If they do, allow retries but with an
  // exponential backoff time period.
  long initialConnectIntervalMillis = 500L;
  long maxConnectIntervalMillis = 6000L;
  long maxConnectRetryTimeElapsedMillis = 900000L;
  float intervalMultiplier = 1.5f;

  long retryIntervalMs = initialConnectIntervalMillis;
  long totalRetryTimeMs = 0;

  while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
    try {
      client.connect(connectOptions);
    } catch (MqttException e) {
      int reason = e.getReasonCode();

      // If the connection is lost or if the server cannot be connected, allow retries, but with
      // exponential backoff.
      System.out.println("An error occurred: " + e.getMessage());
      if (reason == MqttException.REASON_CODE_CONNECTION_LOST
          || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
        System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
        Thread.sleep(retryIntervalMs);
        totalRetryTimeMs += retryIntervalMs;
        retryIntervalMs *= intervalMultiplier;
        if (retryIntervalMs > maxConnectIntervalMillis) {
          retryIntervalMs = maxConnectIntervalMillis;
        }
      } else {
        throw e;
      }
    }
  }

  attachCallback(client, gatewayId);

  // The topic gateways receive error updates on. QoS must be 0.
  String errorTopic = String.format("/devices/%s/errors", gatewayId);
  System.out.println(String.format("Listening on %s", errorTopic));

  client.subscribe(errorTopic, 0);

  return client;

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

Questo esempio utilizza la libreria client delle API di Google per Python.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return f"{rc}: {mqtt.error_string(rc)}"

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print(f"Device client_id is '{client_id}'")

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = f"/devices/{device_id}/config"

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = f"/devices/{device_id}/commands/#"

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print(f"Subscribing to {mqtt_command_topic}")
    client.subscribe(mqtt_command_topic, qos=0)

    return client

Utilizzo di un dominio MQTT a lungo termine

I domini di assistenza a lungo termine (LTS) consentono di utilizzare una configurazione TLS per un periodo di tempo prolungato. Puoi configurare un client MQTT una volta, configurare il client MQTT per pubblicare i messaggi tramite un dominio LTS e comunicare continuamente sul bridge MQTT durante il periodo di tempo supportato.

Attualmente, il dominio LTS attivo è mqtt.2030.ltsapis.goog. Questo dominio LTS è supportato fino al 2030.

Per utilizzare il dominio LTS:

  1. Configura un client MQTT per pubblicare i messaggi tramite un dominio LTS.

    1. Configura il client MQTT per autenticare il dispositivo in Cloud IoT Core.
    2. Quando configuri il dispositivo, associa i certificati principali e di backup del set di CA radice minimo al client MQTT.
  2. Avvia un handshake TLS sulla porta mqtt.2030.ltsapis.goog 8883 o 443. Utilizza almeno le seguenti funzionalità TLS.

    • TLS 1.2
    • P-256 con SHA-256 come chiave del certificato e algoritmo hash
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 tramite P-256 e punti non compressi per il pacchetto di crittografia
    • Indicazione del nome del server (SNI)
    • DNS su TCP o UDP

Per ulteriori informazioni sulla protezione del traffico MQTT, inclusi i messaggi inviati a domini LTS, vedi Consigli per la sicurezza dei dispositivi.

Pubblicazione di eventi di telemetria

Una volta che il dispositivo è stato configurato con un client MQTT e si è connesso al bridge MQTT, può pubblicare un evento di telemetria inviando un messaggio PUBLISH a un argomento MQTT nel seguente formato:

/devices/DEVICE_ID/events

L'ID dispositivo è l'ID stringa del dispositivo specificato nell'ID client MQTT. L'ID dispositivo è sensibile alle maiuscole.

I messaggi pubblicati in questo argomento MQTT vengono inoltrati all'argomento di telemetria predefinito del registro corrispondente. L'argomento di telemetria predefinito è l'argomento Cloud Pub/Sub specificato nel campo eventNotificationConfigs[i].pubsubTopicName nella risorsa del Registro di sistema. Se non esiste un argomento Pub/Sub predefinito, i dati di telemetria pubblicati andranno persi. Per pubblicare messaggi in altri argomenti Cloud Pub/Sub, consulta Pubblicazione di eventi di telemetria in altri argomenti Cloud Pub/Sub.

Il campo dei dati dei messaggi inoltrati contiene una copia del messaggio pubblicato dal dispositivo e i seguenti attributi di messaggio vengono aggiunti a ogni messaggio nell'argomento Cloud Pub/Sub:

Attributo Descrizione
deviceId L'identificatore di stringa definito dall'utente per il dispositivo, ad esempio thing1. L'ID dispositivo deve essere univoco all'interno del registry.
deviceNumId L'ID numerico del dispositivo generato dal server. Quando crei un dispositivo, Cloud IoT Core genera automaticamente l'ID numerico del dispositivo, che è univoco a livello globale e non modificabile.
deviceRegistryLocation La regione del registry del dispositivo Google Cloud Platform, ad esempio us-central1.
deviceRegistryId L'identificatore di stringa definito dall'utente per il registro di dispositivi, ad esempio registry1.
projectId L'ID stringa del progetto cloud proprietario del registro e del dispositivo.
subFolder La sottocartella può essere utilizzata come categoria di evento o classificazione. Per i client MQTT, la sottocartella è l'argomento secondario dopo DEVICE_ID/events, che viene copiato direttamente. Ad esempio, se il client pubblica nell'argomento MQTT /devices/DEVICE_ID/events/alerts, la sottocartella è la stringa alerts.

L'esempio seguente mostra come inviare messaggi PUBLISH tramite la connessione MQTT:

C++

Questo esempio utilizza la libreria client delle API di Google per Python.
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = (Instant.now().toEpochMilli() - iat.toEpochMilli()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = Instant.now();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

Questo esempio utilizza la libreria client delle API di Google per Python.
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = f"/devices/{args.device_id}/{sub_topic}"

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print(f"Waiting for {delay} before reconnecting.")
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = f"{args.registry_id}/{args.device_id}-payload-{i}"
    print(f"Publishing message {i}/{args.num_messages}: '{payload}'")
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print(f"Refreshing token after {seconds_since_issue}s")
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

Pubblicazione di eventi di telemetria in argomenti aggiuntivi di Cloud Pub/Sub

I dispositivi possono pubblicare dati in altri argomenti Cloud Pub/Sub. Per impostazione predefinita, i messaggi MQTT pubblicati in /devices/DEVICE_ID/events vengono inoltrati all'argomento di telemetria predefinito del Registro di sistema corrispondente. Nell'argomento MQTT puoi specificare una sottocartella per inoltrare i dati ad altri argomenti Cloud Pub/Sub. La sottocartella è l'argomento secondario dopo /devices/DEVICE_ID/events.

I messaggi pubblicati in una sottocartella vengono inoltrati all'argomento Cloud Pub/Sub con lo stesso nome. Il registro corrispondente deve essere configurato con l'argomento Cloud Pub/Sub; in caso contrario, i messaggi vengono inoltrati all'argomento Cloud Pub/Sub predefinito.

I messaggi vengono inoltrati all'argomento Cloud Pub/Sub predefinito anziché all'argomento Cloud Pub/Sub aggiuntivo nei seguenti casi:

  • Nessuna sottocartella specificata nell'argomento MQTT
  • Nell'argomento MQTT è stata specificata una sottocartella, ma non è presente un argomento Pub/Sub corrispondente nel registro dispositivi

Ad esempio, se il dispositivo pubblica nell'argomento MQTT /devices/DEVICE_ID/events/alerts, la sottocartella è la stringa alerts. I messaggi vengono inoltrati all'argomento Cloud Pub/Sub aggiuntivo se i campi eventNotificationConfigs[i].subfolderMatches e eventNotificationConfigs[i].pubsubTopicName sono entrambi impostati su alerts. In caso contrario, i messaggi vengono inoltrati all'argomento predefinito Cloud Pub/Sub.

Impostazione stato dispositivo

I dispositivi connessi possono segnalare lo stato del dispositivo inviando un messaggio PUBLISH al seguente argomento MQTT:

/devices/DEVICE_ID/state

Per classificare e recuperare i messaggi di stato, configura il Registro di sistema con un argomento di stato del dispositivo. L'argomento dello stato del dispositivo è l'argomento Cloud Pub/Sub specificato nel campo StateNotificationConfig.pubsubTopicName. Se il registry è configurato con un argomento dello stato del dispositivo, questi messaggi vengono inoltrati all'argomento Cloud Pub/Sub corrispondente.

Per maggiori dettagli sul recupero dei messaggi di stato, vedi Recupero dello stato del dispositivo.

Limitazione del traffico MQTT

Cloud IoT Core limita i progetti che generano un carico eccessivo. Quando i dispositivi tentano di nuovo di eseguire operazioni non riuscite senza attendere, possono attivare limiti che interessano tutti i dispositivi dello stesso progetto Google Cloud.

Per nuovi tentativi, ti consigliamo vivamente di implementare un algoritmo di backoff esponenziale troncato con jitter introdotto.

Mantieni attiva

Quando invii il messaggio CONNECT MQTT iniziale da un client, puoi fornire un valore facoltativo "keep-alive". Questo valore è un intervallo di tempo, misurato in secondi, durante il quale il mediatore prevede che un client invii un messaggio, ad esempio un messaggio PUBLISH. Se il client non invia alcun messaggio al broker durante l'intervallo, la connessione viene chiusa automaticamente. Tieni presente che il valore keep-alive specificato viene moltiplicato per 1,5, quindi impostando un valore keep-alive di 10 minuti si ottiene un intervallo di 15 minuti.

Per ulteriori informazioni, consulta la specifica MQTT.

Impostazioni client

Cloud IoT Core non fornisce un proprio valore keep-alive predefinito; se scegli di specificare un intervallo keep-alive, devi impostarlo nel client.

Per ottenere risultati ottimali, imposta un intervallo minimo di conservazione di 60 secondi per il client. Molte librerie client open source, incluse le librerie MQTT di Paho per C, Python, Node.js e Java, utilizzano 60 secondi per impostazione predefinita.

Limite di tempo di inattività

Separato dall'intervallo keep-alive, Cloud IoT Core ha un proprio limite di tempo di inattività di 20 minuti. In base a questo limite, la connessione client viene terminata automaticamente se il client non invia messaggi per 20 minuti, anche se l'intervallo keep-alive è più lungo. Se non viene fornito un valore keep-alive, viene comunque applicato il timeout di inattività predefinito di 20 minuti.

Risolvere i problemi

In caso di problemi di connessione, consulta la sezione Risoluzione dei problemi.