Como usar gateways com a ponte MQTT

Nesta página, explicamos como os gateways podem usar a ponte MQTT para se comunicar com o Cloud IoT Core e publicar eventos de telemetria em nome de dispositivos vinculados. Antes de começar, leia Como usar a ponte MQTT para informações gerais sobre como usá-la com o Cloud IoT Core.

Veja uma demonstração completa

Como usar gateways com a ponte MQTT

  1. Depois de criar e configurar o gateway, conecte-o ao Cloud IoT Core pela ponte MQTT.
  2. Crie dispositivos caso ainda não tenha feito isso.
  3. Opcional: vincule os dispositivos ao gateway.

    Ao usar a ponte MQTT, você só precisa vincular os dispositivos se eles não puderem gerar os próprios JWTs.

  4. Opcional: inscreva-se no tópico de erro do sistema para receber feedback sobre as operações do dispositivo.

  5. Anexe os dispositivos ao gateway.

  6. Use o gateway para redirecionar mensagens de telemetria, estado do dispositivo e configuração em nome dos dispositivos. Veja uma demonstração completa.

Mensagens de gateway

Depois que o gateway se conectar ao Cloud IoT Core pela ponte MQTT, ele poderá enviar ou receber três tipos de mensagens:

  • Mensagens de controle: anexa um dispositivo ao gateway ou desconecta um dispositivo do gateway. Essas mensagens são enviadas entre o gateway e o Cloud IoT Core. O Cloud IoT Core só aceita mensagens de controle de gateways. Se outro tipo de dispositivo tentar enviar uma mensagem de controle, o Cloud IoT Core encerrará a conexão.
  • Mensagens de gateways e dispositivos: podem ser redirecionadas pelo gateway em nome de um dispositivo ou enviadas diretamente no gateway.
  • Mensagens de erro do sistema: quando o gateway é inscrito no tópico de erro do sistema MQTT em nome do dispositivo, o Cloud IoT Core envia mensagens de erro para o gateway sempre que o dispositivo encontra um erro.

Como anexar dispositivos a um gateway

Para permitir que o gateway envie comunicações por dispositivo para o Cloud IoT Core, faça com que o gateway publique um QoS 1/devices/{device_ID_to_attach}/attach mensagem de controle pela ponte MQTT.

Se você tiver configurado o gateway para autenticar dispositivos usando os JWTs, o payload da mensagem de anexação precisará incluir o token no formato JSON: { "authorization" : "{JWT_token}" }. Caso contrário, o Cloud IoT Core autenticará o dispositivo verificando a associação dele com o gateway.

Resposta de sucesso

Depois que o dispositivo é autorizado, o Cloud IoT Core envia uma mensagem PUBACK ao gateway como uma resposta à mensagem anexada. Depois de receber a mensagem do PUBACK, o gateway pode publicar e se inscrever em tópicos do Cloud IoT Core em nome do dispositivo, como telemetria ou mensagens de configuração.

Se um dispositivo já estiver anexado quando o gateway enviar a mensagem de anexação, o Cloud IoT Core responderá com uma mensagem PUBACK.

Como remover dispositivos do gateway

Para remover um dispositivo do gateway, solicite que o gateway publique uma mensagem de controle /devices/{device_ID}/detach de QoS 1 na ponte MQTT. Se o dispositivo não estiver anexado no momento do envio da mensagem, o Cloud IoT Core ignorará a mensagem de controle de remoção e enviará uma mensagem PUBACK.

Solução de problemas

Para ser notificado quando um dispositivo encontrar um erro, inscreva o gateway no tópico MQTT /devices/{gateway_ID}/errors usando o QoS nível 0:

C#

A etapa em que o dispositivo se inscreve em todos os tópicos do gateway, incluindo o tópico errors, está destacada abaixo:
public static object ListenForConfigMessages(string projectId, string cloudRegion,
    string registryId, string deviceId, string gatewayId, int numMessages,
    string privateKeyFile, string algorithm, string caCerts, string mqttBridgeHostname,
    int mqttBridgePort, int jwtExpiresMinutes, int duration)
{
    var clientId = $"projects/{projectId}/locations/{cloudRegion}/registries/{registryId}" +
        $"/devices/{gatewayId}";
    var jwtIatTime = SystemClock.Instance.GetCurrentInstant();
    // Create a duration
    Duration durationExp = Duration.FromMinutes(jwtExpiresMinutes);
    var jwtExpTime = SystemClock.Instance.GetCurrentInstant().Plus(durationExp);
    var pass = "";
    if (algorithm == "RS256")
    {
        pass = CloudIotMqttExample.CreateJwtRsa(projectId, privateKeyFile);
    }
    else if (algorithm == "ES256")
    {
        Console.WriteLine("Currently, we do not support this algorithm.");
        return 0;
    }

    // Use gateway to connect server
    var mqttClient = GetClient(
      projectId,
      cloudRegion,
      registryId,
      gatewayId,
      caCerts,
      mqttBridgeHostname,
      mqttBridgePort);

    double initialConnectIntervalMillis = 0.5;
    double maxConnectIntervalMillis = 6;
    double maxConnectRetryTimeElapsedMillis = 900;
    double intervalMultiplier = 1.5;

    double retryIntervalMs = initialConnectIntervalMillis;
    double totalRetryTimeMs = 0;

    // Both connect and publish operations may fail. If they do,
    // allow retries but with an exponential backoff time period.
    while (!mqttClient.IsConnected &&
        totalRetryTimeMs < maxConnectRetryTimeElapsedMillis)
    {
        try
        {
            // Connect to the Google MQTT bridge.
            mqttClient.Connect(clientId, "unused", pass);
        }
        catch (AggregateException aggExceps)
        {
            printExceptions(aggExceps);
            Console.WriteLine("Retrying in " + retryIntervalMs
                + " seconds.");

            System.Threading.Thread.Sleep((int)retryIntervalMs);
            totalRetryTimeMs += retryIntervalMs;
            retryIntervalMs *= intervalMultiplier;
            if (retryIntervalMs > maxConnectIntervalMillis)
            {
                retryIntervalMs = maxConnectIntervalMillis;
            }
        }
    }

    SetupMqttTopics(mqttClient, gatewayId);
    AttachDevice(mqttClient, deviceId, "{}");

    // Wait for about a minute for config messages.
    Console.WriteLine("Listening...");
    for (int i = 0; i < duration; ++i)
    {
        Console.Write(".");
        var secSinceIssue = SystemClock.Instance.GetCurrentInstant().Minus(jwtIatTime);
        if (secSinceIssue.TotalSeconds > (60 * jwtExpiresMinutes))
        {
            Console.WriteLine("Refreshing token after {0}s", secSinceIssue);
            jwtIatTime = SystemClock.Instance.GetCurrentInstant();
            // refresh token and reconnect.
            pass = CloudIotMqttExample.CreateJwtRsa(projectId, privateKeyFile);
            mqttClient = GetClient(
                        projectId,
                        cloudRegion,
                        registryId,
                        gatewayId,
                        caCerts,
                        mqttBridgeHostname,
                        mqttBridgePort);
        }
        System.Threading.Thread.Sleep(1000);
    }

    DetachDevice(mqttClient, deviceId);
    // wait for the device get detached.
    System.Threading.Thread.Sleep(2000);
    mqttClient.Disconnect();
    Console.WriteLine("Finished.");
    return 0;
}

Go

A etapa em que o dispositivo se inscreve no tópico errors está destacada abaixo:

import (
	"fmt"
	"io"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {

	const (
		mqttBrokerURL   = "tls://mqtt.googleapis.com:8883"
		protocolVersion = 4 // corresponds to MQTT 3.1.1
	)

	// onConnect defines the on connect handler which resets backoff variables.
	var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
		fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
	}

	// onMessage defines the message handler for the mqtt client.
	var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
		fmt.Fprintf(w, "Message: %s\n", msg.Payload())
	}

	// onDisconnect defines the connection lost handler for the mqtt client.
	var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
		fmt.Println("Client disconnected")
	}

	jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
	clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(mqttBrokerURL)
	opts.SetClientID(clientID)
	opts.SetUsername("unused")
	opts.SetPassword(jwt)
	opts.SetProtocolVersion(protocolVersion)
	opts.SetOnConnectHandler(onConnect)
	opts.SetDefaultPublishHandler(onMessage)
	opts.SetConnectionLostHandler(onDisconnect)

	// Create and connect a client using the above options.
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to connect client")
		return token.Error()
	}

	if err := attachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "AttachDevice error: %v\n", err)
		return err
	}

	// Sleep for 5 seconds to allow attachDevice message to propagate.
	time.Sleep(5 * time.Second)

	// Subscribe to the config topic of the current gateway and a device bound to the gateway.
	gatewayTopic := fmt.Sprintf("/devices/%s/%s", gatewayID, topic)
	if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	deviceTopic := fmt.Sprintf("/devices/%s/%s", deviceID, topic)
	if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	time.Sleep(time.Duration(clientDuration) * time.Second)

	if err := detachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "DetachDevice error: %v\n", err)
		return err
	}

	if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	client.Disconnect(10)
	return nil
}

Java

A etapa em que o dispositivo se inscreve no tópico errors está destacada abaixo:

// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, gatewayId);

// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));

client.subscribe(errorTopic, 0);

return client;

Node.js

Por exemplo, a etapa em que o dispositivo se inscreve no tópico config está destacada abaixo. Para se inscrever no tópico errors, especifique /devices/${gateway_ID}/errors
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  qos: 1,
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on('connect', success => {
  if (!success) {
    console.log('Client not connected...');
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, {qos: 0});

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log('Closing connection to MQTT. Goodbye!');
        client.end(true);
      }, clientDuration); // Safely detach device and close connection.
    }, 5000);
  }
});

client.on('close', () => {
  console.log('Connection closed');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  const decodedMessage = Buffer.from(message, 'base64').toString('ascii');

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

Python

A etapa em que o dispositivo se inscreve no tópico errors está destacada abaixo:
global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

O Cloud IoT Core envia erros de gateway com base no melhor esforço, entregues por QoS 0. Se o gateway não estiver inscrito em /devices/{gateway_ID}/errors, o Cloud IoT Core registrará eventos de falha, mas não enviará uma mensagem PUBACK.

Os erros de MQTT têm a seguinte estrutura:

string error_type;  // A string description of the error type.
string device_id;   // The ID of the device that caused the error.
string description; // A description of the error.

Se a mensagem de erro tiver sido acionada por uma mensagem MQTT, as seguintes informações também serão anexadas:

string message_type;  // The string MQTT message type.
string topic;  // The MQTT topic if applicable, otherwise it is empty.
int packet_id;  // The packet ID of the MQTT message if applicable, otherwise it is zero.

Códigos e tratamento de erros

Código do erro Descrição Ação recomendada
GATEWAY_ATTACHMENT_ERROR Falha na solicitação de anexo do gateway. Não tente novamente sem resolver o problema.
GATEWAY_DEVICE_NOT_FOUND O gateway não encontrou um dispositivo anexado para processar uma mensagem recebida. Não tente novamente sem resolver o problema.
GATEWAY_INVALID_MQTT_TOPIC Não foi possível analisar o tópico MQTT especificado no gateway, que estava formatado incorretamente ou continha um código ou nome de dispositivo inválido. Não tente novamente sem resolver o problema.
GATEWAY_UNEXPECTED_PACKET_ID Não foi possível processar a mensagem no gateway com base no ID do pacote. Por exemplo, um PUBACK pode ter um código de pacote, mas nada estava aguardando a resposta. Não tente novamente sem resolver o problema.
GATEWAY_UNEXPECTED_MESSAGE_TYPE O gateway recebeu mensagens inesperadas, como PUBREL, PUBREC etc. não compatíveis Não tente novamente sem resolver o problema.
GATEWAY_DETACHMENT_DEVICE_ERROR O gateway desconectou um dispositivo devido a um erro. Não tente novamente sem resolver o problema.
DESCONHECIDO O erro é desconhecido. Tente novamente usando espera exponencial.

Para mais informações, consulte a documentação principal de mensagens de erro e a especificação MQTT da versão 3.1.1.