Publishing over the MQTT bridge

This section explains how devices can use the MQTT bridge to communicate with Cloud IoT Core. For general information about HTTP and MQTT, see Protocols.

Be sure to refer to the API documentation for full details about each method described in this section. See also the sample MQTT clients.

To publish over the MQTT bridge:

  1. Install an MQTT client on your device.

  2. Download an MQTT server certificate onto your device.

  3. Configure the MQTT client to authenticate the device to Cloud IoT Core.

  4. Initiate a TLS handshake over mqtt.googleapis.com or a long-term support domain.

  5. Publish telemetry events or set the device state.

MQTT server

Cloud IoT Core supports the MQTT protocol by running a managed broker that listens to the port mqtt.googleapis.com:8883. Port 8883 is the standard TCP port reserved with IANA for secure MQTT connections. Connections to this port must use TLS transport, which is supported by open source clients like Eclipse Paho.

If port 8883 is blocked by your firewall, you can also use port 443: mqtt.googleapis.com:443.

Quality of Service (QoS)

The MQTT specification describes three Quality of Service (QoS) levels:

  • QoS 0, delivered at most once
  • QoS 1, delivered at least once
  • QoS 2, delivered exactly once

Cloud IoT Core does not support QoS 2. Publishing QoS 2 messages closes the connection. Subscribing to a predefined topic with QoS 2 downgrades the QoS level to QoS 1.

QoS 0 and 1 function as follows in Cloud IoT Core:

  • A PUBLISH message with QoS 1 will be acknowledged by the PUBACK message after it has been successfully sent to Cloud Pub/Sub.
  • PUBLISH messages with QoS 0 do not require PUBACK responses, and may be dropped if there is any jitter along the message delivery path (for example, if Cloud Pub/Sub is temporarily unavailable).
  • The Cloud IoT Core MQTT bridge maintains a small buffer of undelivered messages in order to retry them. If the buffer becomes full, the message with QoS 1 may be dropped and a PUBACK message will not be sent to the client. The client is expected to resend the message.

For device configurations, QoS levels are as follows:

  • When QoS is 0, a given configuration version will be published to the device only once. If the device does not receive the configuration, it must resubscribe. A QoS of 0 is thus useful when a configuration is frequently updated (on the order of seconds or minutes) and it's not necessary for the device to receive every update.
  • When QoS is 1, the latest configuration update will be retried until the device acknowledges it with a PUBACK. If a newer configuration is pushed before the older one is acknowledged, the older one will not be redelivered; instead, the new one will be delivered (and redelivered). This level is the safest mode for device configurations: it guarantees that the device will eventually get the latest configuration.

Downloading MQTT server certificates

To use TLS transport, devices must verify Cloud IoT Core server certificates to ensure they're communicating with Cloud IoT Core rather than an impersonator. The following certificate packages support verification:

  • The complete Google root CA certification package (128 KB) for mqtt.googleapis.com.
    • This package establishes the chain of trust to communicate with Google products and services, including Cloud IoT Core.
    • Devices with the complete root CA certification package communicate directly with the MQTT server.
    • This package is regularly updated.
  • Google's minimal root CA set (<1 KB) for mqtt.2030.ltsapis.goog. The minimal root CA set includes a primary and backup certificate.
    • This set is for devices with memory constraints, like microcontrollers, and establishes the chain of trust to communicate with Cloud IoT Core only.
    • Devices with the minimal root CA set communicate with the Cloud IoT Core via long-term support domains.
    • This set is fixed through 2030 (the primary and backup certificates won't change). For added security, Google Trust Services may switch between the primary and backup certificates at any time without notice.

After downloading Google root CA certificates to your device, you can configure an MQTT client to authenticate the device, connect to the MQTT server, and communicate over the MQTT bridge.

Configuring MQTT clients

MQTT clients authenticate devices by connecting to the MQTT bridge. To configure an MQTT client to authenticate a device:

  1. Set the MQTT client ID to the full device path:
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. Associate the MQTT client with MQTT server certificates.
  3. Set the MQTT host name to mqtt.googleapis.com or a long-term support domain (if you used the minimal root CA set).
  4. Specify a username. The MQTT bridge ignores the username field, but some MQTT client libraries will not send the password field unless the username field is specified. For best results, supply an arbitrary username like unused or ignored.
  5. Set the password. The password field must contain the JWT.

The following sample shows how to configure the MQTT client to authenticate a device:

C++

The steps for configuring the client ID and authenticating a device are highlighted below:
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
      MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = kUsername;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf("Waiting for up to %lu seconds for publication of %s\n"
          "on topic %s for client with ClientID: %s\n",
          (kTimeout/1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

MqttExampleOptions options = MqttExampleOptions.fromFlags(args);
if (options == null) {
  // Could not parse.
  System.exit(1);
}

if (options.command == "listen-for-config-messages") {
  System.out.println(
      String.format("Listening for configuration messages for %s:", options.deviceId));
  listenForConfigMessages(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId);
} else if (options.command == "send-data-from-bound-device") {
  System.out.println("Sending data on behalf of device:");
  sendDataFromBoundDevice(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId,
      options.messageType,
      options.telemetryData);
} else {
  System.out.println("Starting mqtt demo:");
  mqttDeviceDemo(options);
}

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

This sample uses the Google API Client Library for Python.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return '{}: {}'.format(rc, mqtt.error_string(rc))


def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print('on_connect', mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1


def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print('on_disconnect', error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True


def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print('on_publish')


def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode('utf-8'))
    print('Received message \'{}\' on topic \'{}\' with Qos {}'.format(
            payload, message.topic, str(message.qos)))


def get_client(
        project_id, cloud_region, registry_id, device_id, private_key_file,
        algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = 'projects/{}/locations/{}/registries/{}/devices/{}'.format(
            project_id, cloud_region, registry_id, device_id)
    print('Device client_id is \'{}\''.format(client_id))

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
            username='unused',
            password=create_jwt(
                    project_id, private_key_file, algorithm))

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = '/devices/{}/config'.format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = '/devices/{}/commands/#'.format(device_id)

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print('Subscribing to {}'.format(mqtt_command_topic))
    client.subscribe(mqtt_command_topic, qos=0)

    return client

Using a long-term MQTT domain

Long-term support (LTS) domains let you use one TLS configuration for an extended period of time. You can set up an MQTT client once, configure the MQTT client to publish messages through an LTS domain, and then communicate over the MQTT bridge continuously during the supported time frame.

The current active LTS domain is mqtt.2030.ltsapis.goog. This LTS domain is supported through 2030.

To use the LTS domain:

  1. Configure an MQTT client to publish messages through an LTS domain.

    1. Configure the MQTT client to authenticate the device to Cloud IoT Core.
    2. When configuring the device, associate the minimal root CA set's primary and backup certificates with the MQTT client.
  2. Initiate a TLS handshake over mqtt.2030.ltsapis.goog on port 8883 or 443. Use at least the following TLS features:

    • TLS 1.2
    • P-256 with SHA-256 as the certificate key and hash algorithm
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 using P-256 and uncompressed points for the cipher suite
    • Server Name Indication
    • DNS over TCP or UDP

For more information on securing MQTT traffic, including messages sent to LTS domains, see Device security recommendations.

Publishing telemetry events

After the device is configured with an MQTT client and connected to the MQTT bridge, it can publish a telemetry event by issuing a PUBLISH message to an MQTT topic in the following format:

/devices/DEVICE_ID/events

The device ID is the string ID of the device specified in the MQTT client ID. The device ID is case sensitive.

Messages published to this MQTT topic are forwarded to the corresponding registry's default telemetry topic. The default telemetry topic is the Cloud Pub/Sub topic specified in the eventNotificationConfigs[i].pubsubTopicName field in the registry resource. If no default Pub/Sub topic exists, published telemetry data will be lost. To publish messages to other Cloud Pub/Sub topics, see Publishing telemetry events to separate Pub/Sub topics.

The forwarded message data field contains a copy of the message published by the device, and the following message attributes are added to each message in the Cloud Pub/Sub topic:

Attribute Description
deviceId The user-defined string identifier for the device, for example, thing1. The device ID must be unique within the registry.
deviceNumId The server-generated numeric ID of the device. When you create a device, Cloud IoT Core automatically generates the device numeric ID; it's globally unique and not editable.
deviceRegistryLocation The Google Cloud Platform region of the device registry, for example, us-central1.
deviceRegistryId The user-defined string identifier for the device registry, for example, registry1.
projectId The string ID of the cloud project that owns the registry and device.
subFolder The subfolder can be used as an event category or classification. For MQTT clients, the subfolder is the subtopic after DEVICE_ID/events, which is copied directly. For example, if the client publishes to the MQTT topic /devices/DEVICE_ID/events/alerts, the subfolder is the string alerts.

The following sample shows how to send PUBLISH messages through the MQTT connection:

C++

int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
      MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = kUsername;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf("Waiting for up to %lu seconds for publication of %s\n"
          "on topic %s for client with ClientID: %s\n",
          (kTimeout/1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while (!client.isConnected() && totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = options.messageType.equals("event") ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'\n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * 60)) {
    System.out.format("\tRefreshing token after: %d seconds\n", secsSinceRefresh);
    iat = new DateTime();
    if (options.algorithm.equals("RS256")) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if (options.algorithm.equals("ES256")) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect();
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes());
  message.setQos(1);
  client.publish(mqttTopic, message);

  if (options.messageType.equals("event")) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print(".");
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

function publishAsync(
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
}

Python

global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = 'events' if args.message_type == 'event' else 'state'

mqtt_topic = '/devices/{}/{}'.format(args.device_id, sub_topic)

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id, args.cloud_region, args.registry_id,
    args.device_id, args.private_key_file, args.algorithm,
    args.ca_certs, args.mqtt_bridge_hostname, args.mqtt_bridge_port)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print('Exceeded maximum backoff time. Giving up.')
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print('Waiting for {} before reconnecting.'.format(delay))
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = '{}/{}-payload-{}'.format(
            args.registry_id, args.device_id, i)
    print('Publishing message {}/{}: \'{}\''.format(
            i, args.num_messages, payload))
    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print('Refreshing token after {}s'.format(seconds_since_issue))
        jwt_iat = datetime.datetime.utcnow()
        client = get_client(
            args.project_id, args.cloud_region,
            args.registry_id, args.device_id, args.private_key_file,
            args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
            args.mqtt_bridge_port)
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    time.sleep(1 if args.message_type == 'event' else 5)

Publishing telemetry events to additional Cloud Pub/Sub topics

Devices can publish data to additional Cloud Pub/Sub topics. By default, MQTT messages published to /devices/DEVICE_ID/events are forwarded to the corresponding registry's default telemetry topic. You can specify a subfolder in the MQTT topic to forward data to additional Cloud Pub/Sub topics. The subfolder The subfolder is the subtopic after /devices/DEVICE_ID/events.

Messages published to a subfolder are forwarded to the Cloud Pub/Sub topic with the same name. The corresponding registry must be configured with the Cloud Pub/Sub topic; otherwise, messages are forwarded to the default Cloud Pub/Sub topic.

Messages are forwarded to the default Cloud Pub/Sub topic instead of the additional Cloud Pub/Sub topic in the following cases:

  • No subfolder is specified in the MQTT topic
  • A subfolder is specified in the MQTT topic, but it doesn't have a matching Pub/Sub topic in the device registry

For example, if the device publishes to the MQTT topic /devices/DEVICE_ID/events/alerts, the subfolder is the string alerts. Messages are forwarded to the additional Cloud Pub/Sub topic if the eventNotificationConfigs[i].subfolderMatches and eventNotificationConfigs[i].pubsubTopicName fields are both set to alerts. Otherwise, messages are forward to the default Cloud Pub/Sub topic.

Setting device state

Connected devices can report device state by issuing a PUBLISH message to the following MQTT topic:

/devices/DEVICE_ID/state

To categorize and retrieve state messages, configure the registry with a device state topic. The device state topic is the Cloud Pub/Sub topic specified in the StateNotificationConfig.pubsubTopicName field. If the registry is configured with a device state topic, these messages are forwarded to the matching Cloud Pub/Sub topic on a best-effort basis.

For more details on retrieving state messages, see Getting device state.

Limiting MQTT traffic

Cloud IoT Core limits projects that generate excessive load. When devices retry failed operations without waiting, they can trigger limits that affect all devices in the same Google Cloud Platform project.

For retries, you are strongly encouraged to implement a truncated exponential backoff algorithm with introduced jitter.

Keep-alive

When sending the initial MQTT CONNECT message from a client, you can supply an optional "keep-alive" value. This value is a time interval, measured in seconds, during which the broker expects a client to send a message, such as a PUBLISH message. If no message is sent from the client to the broker during the interval, the broker automatically closes the connection. Note that the keep-alive value you specify is multiplied by 1.5, so setting a 10-minute keep-alive actually results in a 15 minute interval.

For more information, see the MQTT specification.

Client settings

Cloud IoT Core does not supply its own default keep-alive value; if you choose to specify a keep-alive interval, you must set it in the client.

For best results, set the client's keep-alive interval to a minimum of 60 seconds. Many open source client libraries, including the Paho MQTT libraries for C, Python, Node.js, and Java, use 60 seconds by default.

Idle time limit

Separate from the keep-alive interval, Cloud IoT Core has its own idle time limit of 20 minutes. Based on this limit, a client connection will automatically be terminated if the client doesn't send any messages for 20 minutes, even if the keep-alive interval is longer. If a keep-alive value isn't supplied, the default idle timeout of 20 minutes still takes effect.

Troubleshooting

If you have trouble connecting, see Troubleshooting.

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud IoT Core Documentation