Veröffentlichung über die MQTT-Bridge

In diesem Abschnitt wird erläutert, wie Geräte über die MQTT-Bridge mit Cloud IoT Core kommunizieren können. Allgemeine Informationen zu HTTP und MQTT finden Sie unter Protokolle.

Ausführliche Informationen zu den in diesem Abschnitt beschriebenen Methoden finden Sie in der API-Dokumentation. Weitere Informationen finden Sie auch in den MQTT-spezifischen Beispielen.

So veröffentlichen Sie über die MQTT-Bridge:

  1. Installieren Sie auf Ihrem Gerät einen MQTT-Client.

  2. Laden Sie ein MQTT-Serverzertifikat auf Ihr Gerät herunter.

  3. Konfigurieren Sie den MQTT-Client, um das Gerät bei Cloud IoT Core zu authentifizieren.

  4. Starten Sie einen TLS-Handshake über mqtt.googleapis.com oder eine langfristige Supportdomain.

  5. Veröffentlichen Sie Telemetrieereignisse oder legen Sie den Gerätestatus fest.

MQTT-Server

Cloud IoT Core unterstützt das MQTT-Protokoll durch das Ausführen eines verwalteten Brokers, der den Port mqtt.googleapis.com:8883 überwacht. Port 8883 ist der Standard-TCP-Port, der mit IANA für sichere MQTT-Verbindungen reserviert ist. Verbindungen zu diesem Port müssen TLS-Transport verwenden, das von Open-Source-Clients wie Eclipse Paho unterstützt wird.

Wenn Port 8883 von Ihrer Firewall blockiert wird, können Sie auch Port 443 verwenden: mqtt.googleapis.com:443.

Dienstqualität (QoS)

Die MQTT-Spezifikation beschreibt drei Dienstqualitätsebenen:

  • QoS 0, höchstens einmal bereitgestellt.
  • QoS 1, mindestens einmal bereitgestellt.
  • QoS 2, genau einmal bereitgestellt.

Cloud IoT Core unterstützt QoS 2 nicht. Wenn Sie QoS 2-Nachrichten veröffentlichen, wird die Verbindung geschlossen. Das Abonnieren eines vordefinierten Themas mit QoS 2 führt zu einem Downgrade der QoS-Ebene auf QoS 1.

QoS 0 und 1 funktionieren in Cloud IoT Core so:

  • Eine PUBLISH-Nachricht mit QoS 1 wird von der Nachricht PUBACK bestätigt, nachdem sie erfolgreich an Cloud Pub/Sub gesendet wurde.
  • PUBLISH-Nachrichten mit QoS 0 erfordern keine PUBACK-Antworten und können gelöscht werden, wenn der Nachrichtenzustellpfad Jitter aufweist (z. B. wenn Cloud Pub/Sub vorübergehend nicht verfügbar ist).
  • Die Cloud IoT Core MQTT-Bridge verwaltet einen kleinen Puffer nicht zugestellter Nachrichten, um noch einmal eine Zustellung zu versuchen. Wenn der Puffer voll ist, wird die Nachricht mit QoS 1 gelöscht und es wird keine PUBACK-Nachricht an den Client gesendet. Der Client muss die Nachricht noch einmal senden.

Für Gerätekonfigurationen lauten die Dienstqualitätsstufen so:

  • Wenn „QoS“ 0 ist, wird eine bestimmte Konfigurationsversion nur einmal auf dem Gerät veröffentlicht. Wenn das Gerät die Konfiguration nicht erhält, muss es noch einmal abonniert werden. Ein Dienstqualitätswert von 0 ist daher nützlich, wenn eine Konfiguration häufig in der Größenordnung von Sekunden oder Minuten aktualisiert werden soll und das Gerät nicht jedes Update erhalten muss.
  • Wenn „QoS“ 1 ist, wird das letzte Konfigurationsupdate wiederholt, bis das Gerät es mit einem PUBACK bestätigt. Wenn eine neuere Konfiguration übertragen wird, bevor die ältere bestätigt wurde, wird die ältere nicht noch einmal zugestellt. Stattdessen wird die neue Nachricht zugestellt und noch einmal zugestellt. Diese Ebene ist der sicherste Modus für Gerätekonfigurationen, denn er garantiert, dass das Gerät letztendlich die neueste Konfiguration erhält.

MQTT-Serverzertifikate herunterladen

Für die Verwendung von TLS-Transport müssen Geräte die Cloud IoT Core-Serverzertifikate verifizieren, um sicherzustellen, dass sie mit Cloud IoT Core und nicht mit einer Identitätsübertragung kommunizieren. Die folgenden Zertifikatpakete unterstützen die Verifizierung:

  • Das vollständige Google-Stamm-CA-Zertifizierungspaket (128 KB) für mqtt.googleapis.com.

    • Dieses Paket baut die Vertrauenskette für die Kommunikation mit Produkten und Diensten von Google auf, einschließlich Cloud IoT Core.
    • Geräte mit dem vollständigen Stamm-CA-Zertifizierungspaket kommunizieren direkt mit dem MQTT-Server.
    • Dieses Paket wird regelmäßig aktualisiert.
  • Das minimale Stamm-CA-Set (<1 KB) für mqtt.2030.ltsapis.goog. Das minimale Stamm-CA-Set enthält ein primäres Zertifikat und ein Sicherungszertifikat.

    • Dieses Set ist für Geräte mit Speichereinschränkungen wie Mikrocontroller und stellt eine Vertrauenskette für die Kommunikation mit Cloud IoT Core her.
    • Geräte mit dem minimalen Stamm-CA-Set kommunizieren über langfristige Support-Domains mit Cloud IoT Core.
    • Dieses Set wird bis 2030 korrigiert (das primäre und das Sicherungszertifikat ändern sich nicht). Für zusätzliche Sicherheit können Sie mit Google Trust Services jederzeit ohne Weiteres zwischen dem primären und dem Sicherungszertifikat wechseln.

Nachdem Sie die Google-Stamm-CA-Zertifikate auf Ihr Gerät heruntergeladen haben, können Sie einen MQTT-Client zur Authentifizierung des Geräts konfigurieren, eine Verbindung zum MQTT-Server herstellen und die Kommunikation über die MQTT-Bridge ausführen.

MQTT-Clients konfigurieren

MQTT-Clients authentifizieren Geräte durch Herstellen einer Verbindung zur MQTT-Bridge. So konfigurieren Sie einen MQTT-Client zur Authentifizierung eines Geräts:

  1. Legen Sie als MQTT-Client-ID den vollständigen Gerätepfad fest:
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. Verknüpfen Sie den MQTT-Client mit MQTT-Serverzertifikaten.
  3. Legen Sie den MQTT-Hostnamen auf mqtt.googleapis.com oder eine langfristige Supportdomain fest, wenn Sie das minimale Stamm-CA-Set verwendet haben.
  4. Geben Sie einen Nutzernamen an. Die MQTT-Bridge ignoriert das Feld für den Nutzernamen, aber einige MQTT-Clientbibliotheken senden das Passwortfeld nur, wenn das Feld für den Nutzernamen angegeben ist. Die besten Ergebnisse erzielen Sie, wenn Sie einen beliebigen Nutzernamen wie unused oder ignored angeben.
  5. Legen Sie das Passwort fest. Das Passwortfeld muss das JWT enthalten.

Im folgenden Beispiel wird gezeigt, wie der MQTT-Client für die Authentifizierung eines Geräts konfiguriert wird:

C++

Die Schritte zum Konfigurieren der Client-ID und zum Authentifizieren eines Geräts sind unten aufgeführt:
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

MqttExampleOptions options = MqttExampleOptions.fromFlags(args);
if (options == null) {
  // Could not parse.
  System.exit(1);
}

if ("listen-for-config-messages".equals(options.command)) {
  System.out.println(
      String.format("Listening for configuration messages for %s:", options.deviceId));
  listenForConfigMessages(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId);
} else if ("send-data-from-bound-device".equals(options.command)) {
  System.out.println("Sending data on behalf of device:");
  sendDataFromBoundDevice(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId,
      options.messageType,
      options.telemetryData);
} else {
  System.out.println("Starting mqtt demo:");
  mqttDeviceDemo(options);
}

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

In diesem Beispiel wird die Google API-Clientbibliothek für Python verwendet.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return "{}: {}".format(rc, mqtt.error_string(rc))

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print("Device client_id is '{}'".format(client_id))

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = "/devices/{}/config".format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = "/devices/{}/commands/#".format(device_id)

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print("Subscribing to {}".format(mqtt_command_topic))
    client.subscribe(mqtt_command_topic, qos=0)

    return client

Langfristige MQTT-Domain verwenden

Für Domains mit Langzeitsupport (LTS) können Sie eine TLS-Konfiguration für einen längeren Zeitraum verwenden. Sie können einen MQTT-Client einmal einrichten, den MQTT-Client so konfigurieren, dass Nachrichten über eine LTS-Domain veröffentlicht werden, und dann während des unterstützten Zeitraums kontinuierlich über die MQTT-Bridge kommunizieren.

Die aktuell aktive LTS-Domain ist mqtt.2030.ltsapis.goog. Diese LTS-Domain wird bis 2030 unterstützt.

So verwenden Sie die LTS-Domain:

  1. Konfigurieren Sie einen MQTT-Client für die Veröffentlichung von Nachrichten über eine LTS-Domain.

    1. Konfigurieren Sie den MQTT-Client, um das Gerät bei Cloud IoT Core zu authentifizieren.
    2. Verknüpfen Sie beim Konfigurieren des Geräts die primären und Sicherungszertifikate des minimalen Stamm-CA-Sets mit dem MQTT-Client.
  2. Initiieren Sie einen TLS-Handshake über mqtt.2030.ltsapis.goog auf Port 8883 oder 443. Verwenden Sie mindestens die folgenden TLS-Funktionen.

    • TLS 1.2
    • P-256 mit SHA-256 als Zertifikatschlüssel und Hash-Algorithmus
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 mit P-256 und unkomprimierten Punkten für die Chiffresammlung.
    • Server Name Indication (SNI)
    • DNS über TCP oder UDP

Weitere Informationen zum Sichern des MQTT-Traffics, einschließlich der an LTS-Domains gesendeten Nachrichten, finden Sie unter Gerätesicherheitsempfehlungen.

Telemetrieereignisse veröffentlichen

Nachdem das Gerät mit einem MQTT-Client konfiguriert und mit der MQTT-Bridge verbunden ist, kann es ein Telemetrieereignis veröffentlichen. Senden Sie dazu eine PUBLISH-Nachricht an ein MQTT-Thema im folgenden Format:

/devices/DEVICE_ID/events

Die Geräte-ID ist die String-ID des Geräts, das in der MQTT-Client-ID angegeben ist. Bei der Geräte-ID wird zwischen Groß- und Kleinschreibung unterschieden.

Nachrichten, die in diesem MQTT-Thema veröffentlicht wurden, werden an das Standardtelemetriethema der entsprechenden Registry weitergeleitet. Das Standardtelemetriethema ist das Cloud Pub/Sub-Thema, das in der Registry-Ressource im Feld eventNotificationConfigs[i].pubsubTopicName angegeben ist. Wenn kein Pub/Sub-Standardthema vorhanden ist, gehen veröffentlichte Telemetriedaten verloren. Informationen zum Veröffentlichen von Nachrichten in anderen Cloud Pub/Sub-Themen finden Sie unter Telemetrieereignisse in zusätzlichen Cloud Pub/Sub-Themen veröffentlichen.

Das Feld für die weitergeleiteten Nachrichtendaten enthält eine Kopie der vom Gerät veröffentlichten Nachricht. Jeder Nachricht im Cloud Pub/Sub-Thema werden die folgenden Nachrichtenattribute hinzugefügt:

Attribut Beschreibung
deviceId Die benutzerdefinierte String-ID für das Gerät, z. B. thing1. Die Geräte-ID muss in der Registry eindeutig sein.
deviceNumId Die vom Server generierte numerische ID des Geräts. Wenn Sie ein Gerät erstellen, generiert Cloud IoT Core automatisch die numerische Geräte-ID. Sie ist global eindeutig und kann nicht bearbeitet werden.
deviceRegistryLocation Die Google Cloud Platform-Region der Geräte-Registry, z. B. us-central1.
deviceRegistryId Die benutzerdefinierte String-ID für die Geräte-Registry, z. B. registry1.
projectId Die String-ID des Cloud-Projekts, zu dem die Registry und das Gerät gehören.
subFolder Der Unterordner kann als Ereigniskategorie oder Klassifizierung verwendet werden. Bei MQTT-Clients ist der Unterordner das untergeordnete Thema nach DEVICE_ID/events, das direkt kopiert wird. Wenn der Client beispielsweise im MQTT-Thema /devices/DEVICE_ID/events/alerts veröffentlicht, ist der Unterordner der String alerts.

Das folgende Beispiel zeigt, wie Sie PUBLISH-Nachrichten über die MQTT-Verbindung senden:

C++

In diesem Beispiel wird die Google API-Clientbibliothek für Python verwendet.
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = new DateTime();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

In diesem Beispiel wird die Google API-Clientbibliothek für Python verwendet.
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = "/devices/{}/{}".format(args.device_id, sub_topic)

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print("Waiting for {} before reconnecting.".format(delay))
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = "{}/{}-payload-{}".format(args.registry_id, args.device_id, i)
    print("Publishing message {}/{}: '{}'".format(i, args.num_messages, payload))
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

Telemetrieereignisse in zusätzlichen Cloud Pub/Sub-Themen veröffentlichen

Geräte können Daten in zusätzlichen Cloud Pub/Sub-Themen veröffentlichen. Standardmäßig werden in /devices/DEVICE_ID/events veröffentlichte MQTT-Nachrichten an das Standardtelemetriethema der entsprechenden Registry weitergeleitet. Sie können im MQTT-Thema einen Unterordner angeben, um Daten an zusätzliche Cloud Pub/Sub-Themen weiterzuleiten. Der Unterordner ist das untergeordnete Thema nach /devices/DEVICE_ID/events.

In einem Unterordner veröffentlichte Nachrichten werden an das Cloud Pub/Sub-Thema mit demselben Namen weitergeleitet. Die entsprechende Registry muss mit dem Cloud Pub/Sub-Thema konfiguriert werden. Andernfalls werden Nachrichten an das Standard-Cloud Pub/Sub-Thema weitergeleitet.

In den folgenden Fällen werden Nachrichten an das Cloud Pub/Sub-Standardthema anstelle des zusätzlichen Cloud Pub/Sub-Themas weitergeleitet:

  • Im MQTT-Thema ist kein Unterordner angegeben.
  • Im MQTT-Thema wird ein Unterordner angegeben, aber in der Geräteregistrierung gibt es kein übereinstimmendes Pub/Sub-Thema.

Wenn das Gerät beispielsweise im MQTT-Thema /devices/DEVICE_ID/events/alerts veröffentlicht, ist der Unterordner der String alerts. Nachrichten werden an das zusätzliche Cloud Pub/Sub-Thema weitergeleitet, wenn die beiden Felder eventNotificationConfigs[i].subfolderMatches und eventNotificationConfigs[i].pubsubTopicName auf alerts gesetzt sind. Andernfalls werden Nachrichten an das Cloud Pub/Sub-Standardthema weitergeleitet.

Gerätestatus festlegen

Verbundene Geräte können den Gerätestatus melden, indem sie eine PUBLISH-Nachricht an das folgende MQTT-Thema senden:

/devices/DEVICE_ID/state

Konfigurieren Sie die Registry mit einem Gerätestatusthema, um Statusnachrichten zu kategorisieren und abzurufen. Das Gerätestatusthema ist das Cloud Pub/Sub-Thema, das im Feld StateNotificationConfig.pubsubTopicName angegeben ist. Wenn die Registry mit einem Gerätestatusthema konfiguriert ist, werden diese Nachrichten auf Best-Effort-Basis an das entsprechende Cloud Pub/Sub-Thema weitergeleitet.

Weitere Informationen zum Abrufen von Statusnachrichten finden Sie unter Gerätestatus abrufen.

MQTT-Traffic beschränken

Cloud IoT Core begrenzt Projekte, die eine übermäßige Last generieren. Wenn Geräte fehlgeschlagene Vorgänge wiederholen, ohne zu warten, können sie Limits auslösen, die alle Geräte im selben Google Cloud-Projekt betreffen.

Für Wiederholungsversuche wird dringend empfohlen, einen abgeschnittenen exponentiellen Backoff-Algorithmus mit eingeführtem Jitter zu implementieren.

Keep-Alive

Wenn Sie die erste MQTT-CONNECT-Nachricht von einem Client senden, können Sie einen optionalen „Keep-Alive“-Wert angeben. Dieser Wert ist ein Zeitintervall in Sekunden, in dem der Broker erwartet, dass ein Client eine Nachricht sendet, z. B. eine PUBLISH-Nachricht. Wenn während des Intervalls keine Nachricht vom Client an den Broker gesendet wird, schließt der Broker die Verbindung automatisch. Beachten Sie, dass der von Ihnen angegebene Keep-Alive-Wert mit 1,5 multipliziert wird. Die Festlegung von 10-Minuten-Keep-Alive führt also zu einem Intervall von 15 Minuten.

Weitere Informationen finden Sie in der MQTT-Spezifikation.

Clienteinstellungen

Cloud IoT Core bietet keinen eigenen standardmäßigen Keep-Alive-Wert. Wenn Sie ein Keep-Alive-Intervall angeben möchten, müssen Sie es im Client festlegen.

Die besten Ergebnisse erzielen Sie, wenn Sie das Keep-Alive-Intervall des Clients auf mindestens 60 Sekunden setzen. Viele Open-Source-Clientbibliotheken, einschließlich der Paho MQTT-Bibliotheken für C, Python, Node.js und Java verwenden standardmäßig 60 Sekunden.

Zeitlimit bei Inaktivität

Neben dem Keep-Alive-Intervall hat Cloud IoT Core ein eigenes Zeitlimit von 20 Minuten. Anhand dieses Limits wird eine Clientverbindung automatisch beendet, wenn der Client 20 Minuten lang keine Nachrichten sendet, auch wenn das Keep-Alive-Intervall länger ist. Wenn kein Keep-Alive-Wert angegeben wird, wird die standardmäßige Zeitüberschreitung von 20 Minuten trotzdem wirksam.

Fehlerbehebung

Wenn beim Herstellen der Verbindung Probleme auftreten, finden Sie weitere Informationen unter Fehlerbehebung.