Pubblicazione tramite bridge MQTT

Questa sezione spiega in che modo i dispositivi possono utilizzare il bridge MQTT per comunicare con Cloud IoT Core. Per informazioni generali su HTTP e MQTT, vedi Protocolli.

Per informazioni dettagliate su ogni metodo descritto in questa sezione, consulta la documentazione dell'API. Vedi anche gli esempi relativi a MQTT.

Per pubblicare tramite il bridge MQTT:

  1. Installa un client MQTT sul tuo dispositivo.

  2. Scarica un certificato del server MQTT sul dispositivo.

  3. Configura il client MQTT per autenticare il dispositivo in Cloud IoT Core.

  4. Avvia un handshake TLS su mqtt.googleapis.com o un dominio di assistenza a lungo termine.

  5. Pubblica eventi di telemetria o imposta lo stato del dispositivo.

Server MQTT

Cloud IoT Core supporta il protocollo MQTT eseguendo un intermediario gestito che rimane in ascolto della porta mqtt.googleapis.com:8883. La porta 8883 è la porta TCP standard riservata con IANA per le connessioni MQTT sicure. Le connessioni a questa porta devono utilizzare il trasporto TLS, che è supportato dai client open source come Eclipse Paho.

Se la porta 8883 è bloccata dal firewall, puoi anche utilizzare la porta 443: mqtt.googleapis.com:443.

Qualità del servizio (QoS)

La specifica MQTT descrive tre livelli di qualità del servizio (QoS):

  • QoS 0, consegnato al massimo una volta
  • QoS 1, consegnato almeno una volta
  • QoS 2, consegnato esattamente una volta

Cloud IoT Core non supporta la funzionalità QoS 2. La pubblicazione di messaggi QoS 2 comporta la chiusura della connessione. L'abbonamento a un argomento predefinito con QoS 2 comporta il downgrade del livello QoS a QoS 1.

QoS 0 e 1 funzionano come segue in Cloud IoT Core:

  • Un messaggio PUBLISH con QoS 1 verrà confermato dal messaggio PUBACK dopo che sarà stato inviato correttamente a Cloud Pub/Sub.
  • PUBLISH messaggi con QoS 0 non richiedono risposte PUBACK e potrebbero essere eliminati se è presente un tremolio lungo il percorso di consegna dei messaggi (ad esempio, se Cloud Pub/Sub non è temporaneamente disponibile).
  • Il bridge MQTT Cloud IoT Core mantiene un piccolo buffer di messaggi non recapitati per riprovare. Se il buffer viene esaurito, il messaggio con QoS 1 può essere eliminato e il messaggio PUBACK non viene inviato al client. Il client dovrebbe inviare di nuovo il messaggio.

Per le configurazioni dei dispositivi, i livelli di QoS sono i seguenti:

  • Quando QoS è 0, una determinata versione di configurazione verrà pubblicata sul dispositivo una sola volta. Se il dispositivo non riceve la configurazione, deve riabbonarsi. Un QoS pari a 0 è quindi utile quando una configurazione viene aggiornata di frequente (in secondi o minuti) e non è necessario che il dispositivo riceva ogni aggiornamento.
  • Quando QoS è 1, l'ultimo aggiornamento della configurazione verrà ripetuto fino a quando il dispositivo non lo confermerà con un PUBACK. Se viene eseguito il push di una configurazione più recente prima di accettare quella precedente, quella meno recente non verrà pubblicata nuovamente e verrà invece pubblicata (e ripubblicata) quella nuova. Questo livello è la modalità più sicura per le configurazioni dei dispositivi: garantisce che alla fine il dispositivo riceva la configurazione più recente.

Download dei certificati server MQTT

Per utilizzare il trasporto TLS, i dispositivi devono verificare i certificati server di Cloud IoT Core per assicurarsi di comunicare con Cloud IoT Core anziché con un furto d'identità. I seguenti pacchetti di certificati supportano la verifica:

  • Il pacchetto di certificazione per la CA radice di Google completa (128 kB) per mqtt.googleapis.com.

    • Questo pacchetto definisce la catena di fiducia per comunicare con i prodotti e i servizi Google, incluso Cloud IoT Core.
    • I dispositivi con il pacchetto completo di certificazione CA radice comunicano direttamente con il server MQTT.
    • Questo pacchetto viene aggiornato regolarmente.
  • Set CA radice minimo di Google (<1 kB) per mqtt.2030.ltsapis.goog. Il set minimo di CA radice include un certificato principale e un certificato di backup.

    • Questo set è destinato ai dispositivi con limiti di memoria, come i microcontroller, e stabilisce la catena di affidabilità per comunicare solo con Cloud IoT Core.
    • I dispositivi con il set minimo di CA radice comunicano con Cloud IoT Core tramite domini di assistenza a lungo termine.
    • Questo insieme è stato risolto nel 2030 (i certificati principale e di backup non cambiano). Per maggiore sicurezza, Google Trust Services può passare dai certificati principali a quelli di backup in qualsiasi momento senza preavviso.

Dopo aver scaricato i certificati CA radice di Google sul tuo dispositivo, puoi configurare un client MQTT per autenticare il dispositivo, connetterti al server MQTT e comunicare tramite il bridge MQTT.

Configurazione dei client MQTT

I client MQTT autenticano i dispositivi connettendosi al bridge MQTT. Per configurare un client MQTT per l'autenticazione di un dispositivo:

  1. Imposta l'ID client MQTT sul percorso completo del dispositivo:
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. Associa il client MQTT ai certificati per server MQTT.
  3. Imposta il nome host MQTT su mqtt.googleapis.com o su un dominio di assistenza a lungo termine (se hai utilizzato il set minimo di CA radice).
  4. Specifica un nome utente. Il bridge MQTT ignora il campo del nome utente, ma alcune librerie client MQTT non inviano il campo della password a meno che non venga specificato il campo del nome utente. Per ottenere risultati ottimali, fornisci un nome utente arbitrario come unused o ignored.
  5. Imposta la password. Il campo della password deve contenere il JWT.

L'esempio seguente mostra come configurare il client MQTT per autenticare un dispositivo:

C++

I passaggi per configurare l'ID client e l'autenticazione di un dispositivo sono evidenziati di seguito:
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

MqttExampleOptions options = MqttExampleOptions.fromFlags(args);
if (options == null) {
  // Could not parse.
  System.exit(1);
}

if ("listen-for-config-messages".equals(options.command)) {
  System.out.println(
      String.format("Listening for configuration messages for %s:", options.deviceId));
  listenForConfigMessages(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId);
} else if ("send-data-from-bound-device".equals(options.command)) {
  System.out.println("Sending data on behalf of device:");
  sendDataFromBoundDevice(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId,
      options.messageType,
      options.telemetryData);
} else {
  System.out.println("Starting mqtt demo:");
  mqttDeviceDemo(options);
}

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

Questo esempio utilizza la libreria client delle API di Google per Python.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return "{}: {}".format(rc, mqtt.error_string(rc))

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print("Device client_id is '{}'".format(client_id))

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = "/devices/{}/config".format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = "/devices/{}/commands/#".format(device_id)

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print("Subscribing to {}".format(mqtt_command_topic))
    client.subscribe(mqtt_command_topic, qos=0)

    return client

Utilizzo di un dominio MQTT a lungo termine

I domini di assistenza a lungo termine (LTS) ti consentono di utilizzare una configurazione TLS per un periodo di tempo prolungato. Puoi impostare un client MQTT una volta, configurare il client MQTT per pubblicare messaggi tramite un dominio LTS e quindi comunicare continuamente sul bridge MQTT durante il periodo di tempo supportato.

L'attuale dominio LTS attivo è mqtt.2030.ltsapis.goog. Questo dominio LTS è supportato fino al 2030.

Per utilizzare il dominio LTS:

  1. Configurare un client MQTT per pubblicare messaggi tramite un dominio LTS.

    1. Configura il client MQTT per autenticare il dispositivo in Cloud IoT Core.
    2. Durante la configurazione del dispositivo, associa i certificati radice minimi principali e il backup CA radice impostato con il client MQTT.
  2. Avvia un handshake TLS su mqtt.2030.ltsapis.goog sulla porta 8883 o 443. Utilizza almeno le seguenti funzionalità TLS.

    • TLS 1.2
    • P-256 con SHA-256 come chiave del certificato e algoritmo hash
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 tramite P-256 e punti non compressi per la suite di crittografia
    • Indicazione del nome del server (SNI)
    • DNS tramite TCP o UDP

Per ulteriori informazioni sulla protezione del traffico MQTT, inclusi i messaggi inviati ai domini LTS, vedi Consigli per la sicurezza dei dispositivi.

Pubblicazione di eventi di telemetria

Una volta che il dispositivo è stato configurato con un client MQTT e connesso al bridge MQTT, può pubblicare un evento di telemetria emettendo un messaggio PUBLISH in un argomento MQTT nel seguente formato:

/devices/DEVICE_ID/events

L'ID dispositivo è l'ID stringa del dispositivo specificato nell'ID client MQTT. L'ID dispositivo è sensibile alle maiuscole.

I messaggi pubblicati in questo argomento MQTT vengono inoltrati all'argomento di telemetria predefinito del Registro di sistema corrispondente. L'argomento predefinito di telemetria è l'argomento Cloud Pub/Sub specificato nel campo eventNotificationConfigs[i].pubsubTopicName nella risorsa del Registro di sistema. Se non esiste un argomento Pub/Sub predefinito, i dati di telemetria pubblicati andranno persi. Per pubblicare messaggi su altri argomenti Cloud Pub/Sub, consulta Pubblicazione di eventi di telemetria in altri argomenti Cloud Pub/Sub.

Il campo dei dati del messaggio inoltrato contiene una copia del messaggio pubblicato dal dispositivo e i seguenti attributi vengono aggiunti a ogni messaggio nell'argomento Cloud Pub/Sub:

Attributo Descrizione
deviceId L'identificatore di stringa definito dall'utente per il dispositivo, ad esempio thing1. L'ID dispositivo deve essere univoco all'interno del registro.
deviceNumId L'ID numerico del dispositivo generato dal server. Quando crei un dispositivo, Cloud IoT Core genera automaticamente l'ID numerico del dispositivo, che è univoco a livello globale e non può essere modificato.
deviceRegistryLocation L'area geografica di Google Cloud Platform del Registro di sistema del dispositivo, ad esempio us-central1.
deviceRegistryId L'identificatore di stringa definito dall'utente per il registro dispositivi, ad esempio registry1.
projectId L'ID stringa del progetto cloud proprietario del registry e del dispositivo.
subFolder La sottocartella può essere utilizzata come categoria di evento o classificazione. Per i client MQTT, la sottocartella è l'argomento secondario dopo DEVICE_ID/events, che viene copiato direttamente. Ad esempio, se il client pubblica l'argomento MQTT /devices/DEVICE_ID/events/alerts, la sottocartella è la stringa alerts.

L'esempio seguente mostra come inviare messaggi PUBLISH tramite la connessione MQTT:

C++

Questo esempio utilizza la libreria client delle API di Google per Python.
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = new DateTime();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

Questo esempio utilizza la libreria client delle API di Google per Python.
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = "/devices/{}/{}".format(args.device_id, sub_topic)

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print("Waiting for {} before reconnecting.".format(delay))
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = "{}/{}-payload-{}".format(args.registry_id, args.device_id, i)
    print("Publishing message {}/{}: '{}'".format(i, args.num_messages, payload))
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

Pubblicazione di eventi di telemetria in altri argomenti Cloud Pub/Sub

I dispositivi possono pubblicare dati in altri argomenti Cloud Pub/Sub. Per impostazione predefinita, i messaggi MQTT pubblicati in /devices/DEVICE_ID/events vengono inoltrati all'argomento di telemetria predefinito del Registro di sistema corrispondente. Nell'argomento MQTT puoi specificare una sottocartella per inoltrare i dati ad ulteriori argomenti Cloud Pub/Sub. La sottocartella è l'argomento secondario dopo /devices/DEVICE_ID/events.

I messaggi pubblicati in una sottocartella vengono inoltrati all'argomento Cloud Pub/Sub con lo stesso nome. Il registry corrispondente deve essere configurato con l'argomento Cloud Pub/Sub; in caso contrario, i messaggi vengono inoltrati all'argomento Cloud Pub/Sub predefinito.

I messaggi vengono inoltrati all'argomento Cloud Pub/Sub predefinito anziché all'argomento Cloud Pub/Sub aggiuntivo nei seguenti casi:

  • Nessuna sottocartella specificata nell'argomento MQTT
  • Nell'argomento MQTT è specificata una sottocartella, ma non esiste un argomento Pub/Sub corrispondente nel registro dispositivi.

Ad esempio, se il dispositivo pubblica nell'argomento MQTT /devices/DEVICE_ID/events/alerts, la sottocartella è la stringa alerts. I messaggi vengono inoltrati all'argomento Cloud Pub/Sub aggiuntivo se i campi eventNotificationConfigs[i].subfolderMatches e eventNotificationConfigs[i].pubsubTopicName sono entrambi impostati su alerts. In caso contrario, i messaggi vengono inoltrati all'argomento predefinito Cloud Pub/Sub.

Impostazione dello stato del dispositivo

I dispositivi connessi possono segnalare lo stato del dispositivo inviando un messaggio PUBLISH al seguente argomento MQTT:

/devices/DEVICE_ID/state

Per classificare e recuperare i messaggi di stato, configura il Registro di sistema con un argomento relativo allo stato del dispositivo. L'argomento relativo allo stato del dispositivo è l'argomento Cloud Pub/Sub specificato nel campo StateNotificationConfig.pubsubTopicName. Se il registry è configurato con un argomento relativo allo stato del dispositivo, questi messaggi vengono inoltrati all'argomento Cloud Pub/Sub corrispondente.

Per maggiori dettagli sul recupero dei messaggi di stato, vedi Recuperare lo stato del dispositivo.

Limitazione del traffico MQTT

Cloud IoT Core limita i progetti che generano un carico eccessivo. Quando i dispositivi ritengono le operazioni non riuscite senza attendere, possono attivare limiti che interessano tutti i dispositivi nello stesso progetto Google Cloud.

Per i nuovi tentativi, ti consigliamo vivamente di implementare un algoritmo di backoff esponenziale troncato con tremolio introdotto.

Keep-Alive

Quando invii il messaggio CONNECT MQTT iniziale da un client, puoi fornire un valore facoltativo "keep-alive" Questo valore è un intervallo di tempo, espresso in secondi, durante il quale l'intermediario si aspetta che un client invii un messaggio, ad esempio un messaggio PUBLISH. Se durante il periodo di validità dell'intervallo non viene inviato alcun messaggio dal client, quest'ultimo chiude automaticamente la connessione. Tieni presente che il valore keep-alive specificato viene moltiplicato per 1,5, pertanto l'impostazione di un keep-alive di 10 minuti dà origine a un intervallo di 15 minuti.

Per ulteriori informazioni, consulta la specifica MQTT.

Impostazioni client

Cloud IoT Core non fornisce un proprio valore keep-alive predefinito; se scegli di specificare un intervallo keep-alive, devi impostarlo nel client.

Per ottenere risultati ottimali, imposta l'intervallo keep-alive del client su un minimo di 60 secondi. Molte librerie client open source, tra cui le librerie Paho MQTT per C, Python, Node.js e Java, utilizzano 60 secondi per impostazione predefinita.

Limite di tempo di inattività

Separato dall'intervallo keep-alive, Cloud IoT Core ha il proprio limite di tempo di inattività di 20 minuti. In base a questo limite, la connessione client viene terminata automaticamente se il client non invia messaggi per 20 minuti, anche se l'intervallo keep-alive è più lungo. Se non viene fornito un valore keep-alive, viene applicato il timeout inattivo predefinito di 20 minuti.

Risolvere i problemi

Se hai difficoltà a connetterti, consulta la sezione Risoluzione dei problemi.