Using the MQTT Bridge

This section explains how devices can use the MQTT bridge to communicate with Cloud IoT Core. For general information about HTTP and MQTT, see Protocols.

Be sure to refer to the API documentation for full details about each method described in this section. See also the sample MQTT clients.

MQTT server

The Cloud IoT Core service supports the MQTT protocol by running a managed broker that listens to the port mqtt.googleapis.com:8883. Port 8883 is the standard TCP port reserved with IANA for secure MQTT connections. Connections to this port must use TLS transport, which is supported by open source clients like Eclipse Paho.

If port 8883 is blocked by your firewall, you can also use port 443: mqtt.googleapis.com:443.

Device authentication

When the MQTT client connects, it must specify the device by setting the MQTT client ID to the full device path:

projects/{project-id}/locations/{cloud-region}/registries/{registry-id}/devices/{device-id}

The username is ignored, and the password field must contain the JWT. Note that some MQTT client libraries will not send the password field unless the username is specified. For best results, supply an arbitrary username like unused or ignored.

The following sample shows how to configure the MQTT client ID and authenticate a device:

C/C++

bool GetOpts(int argc, char** argv) {
  int pos = 2;
  bool calcvalues = false;

  if (argc < 2) {
    return false;
  }

  opts.payload = argv[1];

  while (pos < argc) {
    if (strcmp(argv[pos], "--deviceid") == 0) {
      if (++pos < argc) {
        opts.deviceid = argv[pos];
        calcvalues = true;
      }
      else
        return false;
    } else if (strcmp(argv[pos], "--region") == 0) {
      if (++pos < argc) {
        opts.region = argv[pos];
        calcvalues = true;
      }
      else
        return false;
    } else if (strcmp(argv[pos], "--registryid") == 0) {
      if (++pos < argc) {
        opts.registryid = argv[pos];
        calcvalues = true;
      }
      else
        return false;
    } else if (strcmp(argv[pos], "--projectid") == 0) {
      if (++pos < argc) {
        opts.projectid = argv[pos];
        calcvalues = true;
      }
      else
        return false;
    } else if (strcmp(argv[pos], "--ecpath") == 0) {
      if (++pos < argc)
        opts.ecpath = argv[pos];
      else
        return false;
    } else if (strcmp(argv[pos], "--rootpath") == 0) {
      if (++pos < argc)
        opts.rootpath = argv[pos];
      else
        return false;
    }
    pos++;
  }

  if (calcvalues) {
    int n = snprintf(opts.clientid, sizeof(opts.clientid),
        "projects/%s/locations/%s/registries/%s/devices/%s",
        opts.projectid, opts.region, opts.registryid, opts.deviceid);
    if (n < 0 || (n > clientid_maxlen)) {
      if (n < 0) {
        printf("Encoding error!\n");
      } else {
        printf("Error, buffer for storing client ID was too small.\n");
      }
      return false;
    }
    if (TRACE) {
      printf("New client id constructed:\n");
      printf("%s\n", opts.clientid);
    }

    return true; // Caller must free opts.clientid
  }
  return false;
}

static const int kQos = 1;
static const unsigned long kTimeout = 10000L;
static const char* kUsername = "unused";

Java

MqttExampleOptions options = MqttExampleOptions.fromFlags(args);
if (options == null) {
  // Could not parse.
  System.exit(1);
}

// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", options.mqttBridgeHostname, options.mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        options.projectId, options.cloudRegion, options.registryId, options.deviceId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

DateTime iat = new DateTime();
if (options.algorithm.equals("RS256")) {
  connectOptions.setPassword(
      createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
} else if (options.algorithm.equals("ES256")) {
  connectOptions.setPassword(
      createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

Node.js

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${argv.project_id}/locations/${argv.cloud_region}/registries/${argv.registry_id}/devices/${argv.device_id}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: argv.mqtt_bridge_hostname,
  port: argv.mqtt_bridge_port,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(argv.project_id, argv.private_key_file, argv.algorithm),
  protocol: 'mqtts'
};

// Create a client, and connect to the Google MQTT bridge.
const client = mqtt.connect(connectionArgs);

// The MQTT topic that this device will publish data to. The MQTT
// topic name is required to be in the format below. The topic name must end in
// 'state' to publish state and 'events' to publish telemetry. Note that this is
// not the same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${argv.device_id}/${argv.message_type}`;

client.on('connect', () => {
  console.log('connect', arguments);
  // After connecting, publish 'num_messages' messagse asynchronously, at a rate
  // of 1 per second for telemetry events and 1 every 2 seconds for states.
  publishAsync(1, argv.num_messages);
});

client.on('close', () => {
  console.log('close', arguments);
});

client.on('error', () => {
  console.log('error', arguments);
});

client.on('packetsend', () => {
  console.log('packetsend', arguments);
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

This sample uses the Google API Client Library for Python.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return '{}: {}'.format(rc, mqtt.error_string(rc))


def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print('on_connect', mqtt.connack_string(rc))


def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print('on_disconnect', error_str(rc))


def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print('on_publish')


def get_client(
        project_id, cloud_region, registry_id, device_id, private_key_file,
        algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client = mqtt.Client(
            client_id=('projects/{}/locations/{}/registries/{}/devices/{}'
                       .format(
                               project_id,
                               cloud_region,
                               registry_id,
                               device_id)))

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
            username='unused',
            password=create_jwt(
                    project_id, private_key_file, algorithm))

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # Start the network loop.
    client.loop_start()

    return client

Publishing telemetry events

A device can publish a telemetry event by issuing a PUBLISH message through the MQTT connection. Messages must be published to an MQTT topic in the following format:

/devices/{device-id}/events

You can also publish to a subfolder, for example, /devices/{device-id}/events/alerts. The{device-id} is case sensitive and must match the string ID of the device specified in the MQTT client ID.

The messages published to this MQTT topic are forwarded to the Cloud Pub/Sub topic specified in the event_notification_config.pubsub_topic_name field in the device registry resource. The forwarded message data field contains a copy of the message published by the device, and the following message attributes are added to each message in the Cloud Pub/Sub topic:

Attribute Description
deviceNumId The ID of the device that generated this data (a numeric ID generated by Cloud IoT Core).
deviceId The user-defined string identifier for the device, for example, thing1. This ID must be unique within the registry.
deviceRegistryLocation The Google Cloud Platform region of the device registry, for example, us-central1.
deviceRegistryId The user-defined string identifier for the device registry, for example, registry1.
projectId The string ID of the cloud project that owns the registry and device.
subFolder The subfolder can be used as an event category or classification. For MQTT clients, the subfolder is the subtopic after {device-id}/events, which is copied directly. For example, if the client publishes to the MQTT topic /devices/{device-id}/events/alerts, the subfolder is the string alerts.

The following sample shows how to send PUBLISH messages through the MQTT connection:

C/C++

int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
      MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 20;
  conn_opts.cleansession = 1;
  conn_opts.username = kUsername;
  conn_opts.password = CreateJwt(opts.ecpath, opts.projectid);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.ecpath;
  conn_opts.ssl = &sslopts;

  if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf("Waiting for up to %lu seconds for publication of %s\n"
          "on topic %s for client with ClientID: %s\n",
          (kTimeout/1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());
try {
  client.connect(connectOptions);

  // Publish to the events or state topic based on the flag.
  String subTopic = options.messageType.equals("event") ? "events" : options.messageType;

  // The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
  // required to be in the format below. Note that this is not the same as the device registry's
  // Cloud Pub/Sub topic.
  String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

  // Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
  for (int i = 1; i <= options.numMessages; ++i) {
    String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
    System.out.format(
        "Publishing %s message %d/%d: '%s'\n",
        options.messageType, i, options.numMessages, payload);

    // Refresh the connection credentials before the JWT expires.
    long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
    if (secsSinceRefresh > (options.tokenExpMins * 60)) {
      System.out.format("\tRefreshing token after: %d seconds\n", secsSinceRefresh);
      iat = new DateTime();
      if (options.algorithm.equals("RS256")) {
        connectOptions.setPassword(
            createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
      } else if (options.algorithm.equals("ES256")) {
        connectOptions.setPassword(
            createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
      } else {
        throw new IllegalArgumentException(
            "Invalid algorithm " + options.algorithm
                + ". Should be one of 'RS256' or 'ES256'.");
      }
    }

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
    // also supports qos=0 for at most once delivery.
    MqttMessage message = new MqttMessage(payload.getBytes());
    message.setQos(1);
    client.publish(mqttTopic, message);

    if (options.messageType.equals("event")) {
      // Send telemetry events every second
      Thread.sleep(1000);
    } else {
      // Note: Update Device state less frequently than with telemetry events
      Thread.sleep(5000);
    }
  }
} finally {
  // Disconnect the client and finish the run.
  client.disconnect();
}
System.out.println("Finished loop successfully. Goodbye!");

Node.js

function publishAsync (messageCount, numMessages) {
  const payload = `${argv.registry_id}/${argv.device_id}-payload-${messageCount}`;
  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
  // Cloud IoT Core also supports qos=0 for at most once delivery.
  console.log('Publishing message:', payload);
  client.publish(mqttTopic, payload, { qos: 1 });

  const delayMs = argv.message_type === 'events' ? 1000 : 2000;
  if (messageCount < numMessages) {
    // If we have published fewer than numMessage messages, publish payload
    // messageCount + 1 in 1 second.
    setTimeout(function () {
      publishAsync(messageCount + 1, numMessages);
    }, delayMs);
  } else {
    // Otherwise, close the connection.
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
  }
}

Python

This sample uses the Google API Client Library for Python.
def main():
    args = parse_command_line_args()

    # Publish to the events or state topic based on the flag.
    sub_topic = 'events' if args.message_type == 'event' else 'state'

    mqtt_topic = '/devices/{}/{}'.format(args.device_id, sub_topic)

    jwt_iat = datetime.datetime.utcnow()
    jwt_exp_mins = args.jwt_expires_minutes
    client = get_client(
        args.project_id, args.cloud_region, args.registry_id, args.device_id,
        args.private_key_file, args.algorithm, args.ca_certs,
        args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    # Publish num_messages mesages to the MQTT bridge once per second.
    for i in range(1, args.num_messages + 1):
        payload = '{}/{}-payload-{}'.format(
                args.registry_id, args.device_id, i)
        print('Publishing message {}/{}: \'{}\''.format(
                i, args.num_messages, payload))
        seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
        if seconds_since_issue > 60 * jwt_exp_mins:
            print('Refreshing token after {}s').format(seconds_since_issue)
            client.loop_stop()
            jwt_iat = datetime.datetime.utcnow()
            client = get_client(
                args.project_id, args.cloud_region,
                args.registry_id, args.device_id, args.private_key_file,
                args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
                args.mqtt_bridge_port)

        # Publish "payload" to the MQTT topic. qos=1 means at least once
        # delivery. Cloud IoT Core also supports qos=0 for at most once
        # delivery.
        client.publish(mqtt_topic, payload, qos=1)

        # Send events every second. State should not be updated as often
        time.sleep(1 if args.message_type == 'event' else 5)

    # End the network loop and finish.
    client.loop_stop()
    print('Finished.')

Quality of Service (QoS)

The MQTT specification describes three Quality of Service (QoS) levels:

  • QoS 0, delivered at most once
  • QoS 1, delivered at least once
  • QoS 2, delivered exactly once (not supported by Cloud IoT Core)

QoS 0 and 1 function as follows in Cloud IoT Core:

  • A PUBLISH message with QoS 1 will be acknowledged by the PUBACK message after it has been successfully sent to Cloud Pub/Sub.
  • PUBLISH messages with QoS 0 do not require PUBACK responses, and may be dropped if there is any jitter along the message delivery path (for example, if Cloud Pub/Sub is temporarily unavailable).
  • The Cloud IoT Core MQTT bridge maintains a small buffer of undelivered messages in order to retry them. If the buffer becomes full, the message with QoS 1 may be dropped and a PUBACK message will not be sent to the client. The client is expected to resend the message.

For device configurations, QoS levels are as follows:

  • When QoS is 0, a given configuration version will be published to the device only once. If the device does not receive the configuration, it must resubscribe. A QoS of 0 is thus useful when a configuration is frequently updated (on the order of seconds or minutes) and it's not necessary for the device to receive every update.
  • When QoS is 1, the latest configuration update will be retried until the device acknowledges it with a PUBACK. If a newer configuration is pushed before the older one is acknowledged, the older one will not be redelivered; instead, the new one will be delivered (and redelivered). This level is the safest mode for device configurations: it guarantees that the device will eventually get the latest configuration.

Excessive load and exponential backoff

Cloud IoT Core limits projects that generate excessive load. When devices retry failed operations without waiting, they can trigger limits that affect all devices in the same Cloud Platform project.

For retries, you are strongly encouraged to implement a truncated exponential backoff algorithm with introduced jitter.

Keep-alive

When sending the initial MQTT CONNECT message from a client, you can supply an optional "keep-alive" value. This value is a time interval, measured in seconds, during which the broker expects a client to send a message, such as a PUBLISH message. If no message is sent from the client to the broker during the interval, the broker automatically closes the connection. Note that the keep-alive value you specify is multiplied by 1.5, so setting a 10 minute keep-alive actually results in a 15 minute interval.

Separate from the keep-alive interval, Cloud IoT Core has its own idle time limit of 20 minutes. Based on this limit, a client connection will automatically be terminated if the client doesn't send any messages for 20 minutes, even if the keep-alive interval is longer. If a keep-alive value isn't supplied, the default idle timeout of 20 minutes still takes effect.

For more information, see the MQTT specification.

Troubleshooting

If you have trouble connecting, see Troubleshooting.

Send feedback about...

Google Cloud Internet of Things Core