Using gateways with the MQTT bridge

This page explains how gateways can use the MQTT bridge to communicate with Cloud IoT Core and publish telemetry events on behalf of bound devices. Before you begin, read Using the MQTT bridge for general information on using the MQTT bridge with Cloud IoT Core.

Try an end-to-end demo

Using gateways with the MQTT bridge

  1. After you've created and configured the gateway, connect it to Cloud IoT Core over the MQTT bridge.
  2. Create devices if you haven't already.
  3. Optional: Bind the devices to the gateway.

    When using the MQTT bridge, you only need to bind the devices if they can't generate their own JWTs.

  4. Optional: Subscribe to the system error topic to get feedback on whether device operations are successful or not.

  5. Attach the devices to the gateway.

  6. Use the gateway to relay telemetry, device state, and configuration messages on behalf of its devices. Try the end-to-end demo to learn how.

Gateway messages

After the gateway connects to Cloud IoT Core over the MQTT bridge, it can send or receive three types of messages:

  • Control messages: Attaches a device to the gateway, or detaches a device from the gateway. These messages are sent between the gateway and Cloud IoT Core. Cloud IoT Core accepts control messages only from gateways; if another type of device attempts to send a control message, Cloud IoT Core closes the connection.
  • Messages from gateways and devices: Can be relayed by the gateway on behalf of a device, or sent directly from the gateway itself.
  • System error messages: When the gateway is subscribed to the MQTT system error topic on behalf of the device, Cloud IoT Core sends error messages to the gateway whenever the device encounters an error.

Attaching devices to a gateway

To enable the gateway to proxy device communications with Cloud IoT Core, have the gateway publish a QoS 1 /devices/{device_ID_to_attach}/attach control message over the MQTT bridge.

If you configured the gateway to authenticate devices using the devices' JWTs, the payload of the attach message must include the token in JSON format: { 'authorization' : '{JWT_token}' }. Otherwise, Cloud IoT Core authenticates the device by checking its association with the gateway.

Success response

After the device is authorized, Cloud IoT Core sends a PUBACK message to the gateway as a response to the attach message. After the gateway receives the PUBACK message, it can publish and subscribe to Cloud IoT Core topics on behalf of the device, such as, telemetry or configuration messages.

If a device is already attached when the gateway sends the attach message, Cloud IoT Core responds with a PUBACK message.

Detaching devices from the gateway

To detach a device from the gateway, have the gateway publish a QoS 1 /devices/{device_ID}/detach control message over the MQTT bridge. If the device isn't attached at the time the message is sent, Cloud IoT Core ignores the detach control message and sends a PUBACK message.

Troubleshooting

To be notified when a device encounters an error, subscribe the gateway to the MQTT /devices/{gateway_ID}/errors topic using QoS level 0:

C#

The step where the device subscribes to all topics for the gateway, including the errors topic, is highlighted below:
public static object ListenForConfigMessages(string projectId, string cloudRegion,
    string registryId, string deviceId, string gatewayId, int numMessages,
    string privateKeyFile, string algorithm, string caCerts, string mqttBridgeHostname,
    int mqttBridgePort, int jwtExpiresMinutes, int duration)
{
    var clientId = $"projects/{projectId}/locations/{cloudRegion}/registries/{registryId}" +
        $"/devices/{gatewayId}";
    var jwtIatTime = SystemClock.Instance.GetCurrentInstant();
    // Create a duration
    Duration durationExp = Duration.FromMinutes(jwtExpiresMinutes);
    var jwtExpTime = SystemClock.Instance.GetCurrentInstant().Plus(durationExp);
    var pass = "";
    if (algorithm == "RS256")
    {
        pass = CloudIotMqttExample.CreateJwtRsa(projectId, privateKeyFile);
    }
    else if (algorithm == "ES256")
    {
        Console.WriteLine("Currently, we do not support this algorithm.");
        return 0;
    }

    // Use gateway to connect server
    var mqttClient = GetClient(
      projectId,
      cloudRegion,
      registryId,
      gatewayId,
      caCerts,
      mqttBridgeHostname,
      mqttBridgePort);

    double initialConnectIntervalMillis = 0.5;
    double maxConnectIntervalMillis = 6;
    double maxConnectRetryTimeElapsedMillis = 900;
    double intervalMultiplier = 1.5;

    double retryIntervalMs = initialConnectIntervalMillis;
    double totalRetryTimeMs = 0;

    // Both connect and publish operations may fail. If they do,
    // allow retries but with an exponential backoff time period.
    while (!mqttClient.IsConnected &&
        totalRetryTimeMs < maxConnectRetryTimeElapsedMillis)
    {
        try
        {
            // Connect to the Google MQTT bridge.
            mqttClient.Connect(clientId, "unused", pass);
        }
        catch (AggregateException aggExceps)
        {
            printExceptions(aggExceps);
            Console.WriteLine("Retrying in " + retryIntervalMs
                + " seconds.");

            System.Threading.Thread.Sleep((int)retryIntervalMs);
            totalRetryTimeMs += retryIntervalMs;
            retryIntervalMs *= intervalMultiplier;
            if (retryIntervalMs > maxConnectIntervalMillis)
            {
                retryIntervalMs = maxConnectIntervalMillis;
            }
        }
    }

    SetupMqttTopics(mqttClient, gatewayId);
    AttachDevice(mqttClient, deviceId, "{}");

    // Wait for about a minute for config messages.
    Console.WriteLine("Listening...");
    for (int i = 0; i < duration; ++i)
    {
        Console.Write(".");
        var secSinceIssue = SystemClock.Instance.GetCurrentInstant().Minus(jwtIatTime);
        if (secSinceIssue.TotalSeconds > (60 * jwtExpiresMinutes))
        {
            Console.WriteLine("Refreshing token after {0}s", secSinceIssue);
            jwtIatTime = SystemClock.Instance.GetCurrentInstant();
            // refresh token and reconnect.
            pass = CloudIotMqttExample.CreateJwtRsa(projectId, privateKeyFile);
            mqttClient = GetClient(
                        projectId,
                        cloudRegion,
                        registryId,
                        gatewayId,
                        caCerts,
                        mqttBridgeHostname,
                        mqttBridgePort);
        }
        System.Threading.Thread.Sleep(1000);
    }

    DetachDevice(mqttClient, deviceId);
    // wait for the device get detached.
    System.Threading.Thread.Sleep(2000);
    mqttClient.Disconnect();
    Console.WriteLine("Finished.");
    return 0;
}

Go

The step where the device subscribes to the errors topic is highlighted below:

import (
	"fmt"
	"io"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {

	const (
		mqttBrokerURL   = "tls://mqtt.googleapis.com:8883"
		protocolVersion = 4 // corresponds to MQTT 3.1.1
	)

	// onConnect defines the on connect handler which resets backoff variables.
	var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
		fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
	}

	// onMessage defines the message handler for the mqtt client.
	var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
		fmt.Fprintf(w, "Message: %s\n", msg.Payload())
	}

	// onDisconnect defines the connection lost handler for the mqtt client.
	var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
		fmt.Println("Client disconnected")
	}

	jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
	clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(mqttBrokerURL)
	opts.SetClientID(clientID)
	opts.SetUsername("unused")
	opts.SetPassword(jwt)
	opts.SetProtocolVersion(protocolVersion)
	opts.SetOnConnectHandler(onConnect)
	opts.SetDefaultPublishHandler(onMessage)
	opts.SetConnectionLostHandler(onDisconnect)

	// Create and connect a client using the above options.
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to connect client")
		return token.Error()
	}

	if err := attachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "AttachDevice error: %v\n", err)
		return err
	}

	// Sleep for 5 seconds to allow attachDevice message to propagate.
	time.Sleep(5 * time.Second)

	// Subscribe to the config topic of the current gateway and a device bound to the gateway.
	gatewayTopic := fmt.Sprintf("/devices/%s/%s", gatewayID, topic)
	if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	deviceTopic := fmt.Sprintf("/devices/%s/%s", deviceID, topic)
	if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	time.Sleep(time.Duration(clientDuration) * time.Second)

	if err := detachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "DetachDevice error: %v\n", err)
		return err
	}

	if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	client.Disconnect(10)
	return nil
}

Java

The step where the device subscribes to the errors topic is highlighted below:

// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

DateTime iat = new DateTime();
if (algorithm.equals("RS256")) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if (algorithm.equals("ES256")) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format(mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while (!client.isConnected() && totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, gatewayId);

// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));

client.subscribe(errorTopic, 0);

return client;

Node.js

As an example, the step where the device subscribes to the config topic is highlighted below. To subscribe to the errors topic, specify /devices/${gateway_ID}/errors
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  qos: 1,
  secureProtocol: 'TLSv1_2_method',
};

// Create a client, and connect to the Google MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on('connect', success => {
  if (!success) {
    console.log('Client not connected...');
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, {qos: 0});

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log('Closing connection to MQTT. Goodbye!');
        client.end(true);
      }, clientDuration); // Safely detach device and close connection.
    }, 5000);
  }
});

client.on('close', () => {
  console.log('Connection closed');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  const decodedMessage = Buffer.from(message, 'base64').toString('ascii');

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

Python

The step where the device subscribes to the errors topic is highlighted below:
global minimum_backoff_time

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id, cloud_region, registry_id, gateway_id,
    private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
    mqtt_bridge_port)

attach_device(client, device_id, '')
print('Waiting for device to attach.')
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = '/devices/{}/config'.format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = '/devices/{}/config'.format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = '/devices/{}/errors'.format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print('Exceeded maximum backoff time. Giving up.')
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print('Refreshing token after {}s'.format(seconds_since_issue))
        jwt_iat = datetime.datetime.utcnow()
        client = get_client(
            project_id, cloud_region, registry_id, gateway_id,
            private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
            mqtt_bridge_port)

    time.sleep(1)

detach_device(client, device_id)

print('Finished.')

Cloud IoT Core sends gateway errors on a best-effort basis, delivered over QoS 0. If the gateway isn't subscribed to /devices/{gateway_ID}/errors, Cloud IoT Core logs failure events but doesn't send a PUBACK message.

MQTT errors have the following structure:

string error_type;  // A string description of the error type.
string device_id;   // The ID of the device that caused the error.
string description; // A description of the error.

If the error message was triggered by an MQTT message, the following information will be attached as well:

string message_type;  // The string MQTT message type.
string topic;  // The MQTT topic if applicable, otherwise it is empty.
int packet_id;  // The packet ID of the MQTT message if applicable, otherwise it is zero.

Error codes and error handling

Error code Description Recommended action
GATEWAY_ATTACHMENT_ERROR A gateway attachment request failed. Do not retry without fixing the problem.
GATEWAY_DEVICE_NOT_FOUND The gateway was unable to find an attached device to handle an incoming message. Do not retry without fixing the problem.
GATEWAY_INVALID_MQTT_TOPIC The gateway was unable to parse the specified MQTT topic, which was either ill-formatted or contained an invalid device ID or name. Do not retry without fixing the problem.
GATEWAY_UNEXPECTED_PACKET_ID The gateway was unable to process the message based on its packet ID. For example, a PUBACK may have contained a packet ID but nothing was waiting for the response. Do not retry without fixing the problem.
GATEWAY_UNEXPECTED_MESSAGE_TYPE The gateway received unexpected messages, such as unsupported PUBREL, PUBREC, etc. Do not retry without fixing the problem.
GATEWAY_DETACHMENT_DEVICE_ERROR The gateway detached a device because of a device error. Do not retry without fixing the problem.
UNKNOWN The error is unknown. Retry using exponential backoff.

See the main error messages documentation and the MQTT version 3.1.1 specification for more information.

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud IoT Core Documentation