Como publicar sobre a ponte MQTT

Nesta seção, explicamos como os dispositivos podem usar a ponte MQTT para se comunicar com o Cloud IoT Core. Para informações gerais sobre HTTP e MQTT, consulte Protocolos.

Consulte a documentação da API para ver detalhes completos sobre cada método descrito nesta seção. Consulte também as amostras relacionadas a MQTT (em inglês).

Para publicar pela ponte MQTT:

  1. Instale um cliente MQTT no dispositivo.

  2. Faça o download de um certificado do servidor MQTT para o dispositivo.

  3. Configure o cliente MQTT para autenticar o dispositivo no Cloud IoT Core.

  4. Inicie um handshake de TLS no mqtt.googleapis.com ou em um domínio de suporte de longo prazo.

  5. Publique eventos de telemetria ou defina o estado do dispositivo.

Servidor MQTT

O Cloud IoT Core oferece suporte ao protocolo MQTT executando um agente gerenciado que detecta a porta mqtt.googleapis.com:8883. A porta 8883 é a porta TCP padrão reservada com IANA para conexões MQTT seguras. As conexões com essa porta precisam usar a transporte TLS, que é aceita por clientes de código aberto, como o Eclipse Paho (em inglês).

Se a porta 8883 estiver bloqueada pelo firewall, também será possível usar a porta 443: mqtt.googleapis.com:443.

Qualidade de Serviço (QoS, na sigla em inglês)

A especificação MQTT descreve três níveis de Qualidade de Serviço (QoS):

  • QoS 0, entregue no máximo uma vez
  • QoS 1, entregue pelo menos uma vez
  • QoS 2, entregue exatamente uma vez

O Cloud IoT Core não é compatível com QoS 2. A publicação de mensagens de Qo2 2 encerra a conexão. Fazer a assinatura de um tópico predefinido com QoS 2 faz downgrade do nível de QoS para QoS 1.

As funções QoS 0 e 1 funcionam do seguinte modo no Cloud IoT Core:

  • Uma mensagem PUBLISH com QoS 1 será confirmada pela mensagem PUBACK depois de ser enviada ao Cloud Pub/Sub.
  • As mensagens PUBLISH com QoS 0 não exigem respostas PUBACK e podem ser descartadas se houver instabilidade na rota de entrega da mensagem (por exemplo, se o Cloud Pub/Sub estiver temporariamente indisponível).
  • A ponte MQTT do Cloud IoT Core mantém um pequeno buffer de mensagens não entregues para repeti-las. Se o buffer ficar cheio, a mensagem com QoS 1 poderá ser descartada e uma mensagem PUBACK não será enviada ao cliente. O cliente deverá reenviar a mensagem.

Para as configurações do dispositivo, os níveis de QoS são os seguintes:

  • Quando a QoS é igual a zero, uma versão de configuração específica é publicada no dispositivo apenas uma vez. Se o dispositivo não receber a configuração, ele precisará assinar novamente. Assim, um QoS de 0 é útil quando uma configuração é atualizada com frequência (em segundos ou minutos) e não é necessário que o dispositivo receba todas as atualizações.
  • Quando a QoS é 1, a atualização mais recente da configuração será repetida até que o dispositivo a reconheça com um PUBACK. Se uma configuração mais recente for enviada antes da confirmação da mais antiga, a mais antiga não será reenviada. Em vez disso, o novo será entregue (e reenviado). Esse nível é o modo mais seguro para configurações de dispositivos: garante que ele receba a configuração mais recente.

Como fazer o download de certificados do servidor MQTT

Para usar o transporte por TLS, os dispositivos precisam verificar os certificados do servidor do Cloud IoT Core para garantir que estão se comunicando com o Cloud IoT Core, em vez de falsificar a identidade. Os seguintes pacotes de certificado são compatíveis com a verificação:

  • O pacote completo de certificação de CA raiz do Google (128 KB) para mqtt.googleapis.com

    • Esse pacote estabelece a cadeia de confiança para se comunicar com os produtos e serviços do Google, incluindo o Cloud IoT Core.
    • Os dispositivos com o pacote completo de certificação de CA raiz se comunicam diretamente com o servidor MQTT.
    • Esse pacote é atualizado regularmente.
  • Conjunto mínimo de CAs raiz (<1 KB) do Google para mqtt.2030.ltsapis.goog. O conjunto mínimo de CA raiz inclui um certificado principal e de backup.

    • Esse conjunto é destinado a dispositivos com restrições de memória, como microcontroladores, e estabelece a cadeia de confiança para se comunicar apenas com o Cloud IoT Core.
    • Os dispositivos com o conjunto mínimo de CAs raiz se comunicam com o Cloud IoT Core por meio de domínios de suporte de longo prazo.
    • Esse conjunto foi corrigido até 2030. Os certificados principal e alternativo não serão alterados. Para ter mais segurança, o Google Trust Services pode alternar entre os certificados primário e de backup a qualquer momento e sem aviso prévio.

Depois de fazer o download dos certificados de CA raiz do Google para seu dispositivo, você pode configurar um cliente MQTT para autenticá-lo, conectar-se ao servidor MQTT e se comunicar pela ponte MQTT.

Como configurar clientes MQTT

Os clientes MQTT autenticam dispositivos conectando-se à ponte MQTT. Para configurar um cliente MQTT para autenticar um dispositivo:

  1. Defina o ID do cliente MQTT para o caminho completo do dispositivo:
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. Associe o cliente MQTT a certificados de servidor MQTT.
  3. Defina o nome do host do MQTT como mqtt.googleapis.com ou um domínio de suporte de longo prazo (se você tiver usado o conjunto mínimo de CA raiz).
  4. Especifique um nome de usuário. A ponte MQTT ignora o campo "nome de usuário", mas algumas bibliotecas de cliente MQTT não enviam o campo de senha, a menos que o campo de nome de usuário seja especificado. Para melhores resultados, forneça um nome de usuário arbitrário, como unused ou ignored.
  5. Definir a senha. O campo senha precisa conter o JWT.

Veja na amostra a seguir como configurar o cliente MQTT para autenticar um dispositivo:

C++

As etapas para configurar o ID do cliente e autenticar um dispositivo estão destacadas abaixo:
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

MqttExampleOptions options = MqttExampleOptions.fromFlags(args);
if (options == null) {
  // Could not parse.
  System.exit(1);
}

if ("listen-for-config-messages".equals(options.command)) {
  System.out.println(
      String.format("Listening for configuration messages for %s:", options.deviceId));
  listenForConfigMessages(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId);
} else if ("send-data-from-bound-device".equals(options.command)) {
  System.out.println("Sending data on behalf of device:");
  sendDataFromBoundDevice(
      options.mqttBridgeHostname,
      options.mqttBridgePort,
      options.projectId,
      options.cloudRegion,
      options.registryId,
      options.gatewayId,
      options.privateKeyFile,
      options.algorithm,
      options.deviceId,
      options.messageType,
      options.telemetryData);
} else {
  System.out.println("Starting mqtt demo:");
  mqttDeviceDemo(options);
}

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

Neste exemplo, usamos a biblioteca de cliente de APIs do Google para Python.
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return "{}: {}".format(rc, mqtt.error_string(rc))

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print("Device client_id is '{}'".format(client_id))

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = "/devices/{}/config".format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = "/devices/{}/commands/#".format(device_id)

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print("Subscribing to {}".format(mqtt_command_topic))
    client.subscribe(mqtt_command_topic, qos=0)

    return client

Como usar um domínio MQTT de longo prazo

Os domínios com suporte de longo prazo (LTS, na sigla em inglês) permitem que você use uma configuração TLS por um longo período. Você pode definir um cliente MQTT uma vez, configurar o cliente MQTT para publicar mensagens por meio de um domínio LTS e, em seguida, comunicar-se pela ponte MQTT continuamente durante o período de tempo compatível.

O domínio LTS ativo atual é mqtt.2030.ltsapis.goog. Este domínio LTS é compatível até 2030.

Para usar o domínio LTS:

  1. Configurar um cliente MQTT para publicar mensagens por meio de um domínio LTS.

    1. Configure o cliente MQTT para autenticar o dispositivo no Cloud IoT Core.
    2. Ao configurar o dispositivo, associe os certificados principal e de backup do conjunto mínimo de CA raiz ao cliente MQTT.
  2. Inicie um handshake de TLS na mqtt.2030.ltsapis.goog na porta 8883 ou 443. Use pelo menos os seguintes recursos de TLS.

    { 101)
    • TLS 1.2
    • P-256, com SHA-256 como chave de certificado e algoritmo de hash
    • TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 usando P-256 e pontos descompactados para o pacote de criptografia
    • Indicação de nome do servidor (SNI)
    • DNS por TCP ou UDP

Para mais informações sobre como proteger o tráfego MQTT, incluindo mensagens enviadas para domínios LTS, consulte Recomendações de segurança do dispositivo.

Como publicar eventos de telemetria

Depois que o dispositivo for configurado com um cliente MQTT e conectado à ponte MQTT, ele poderá publicar um evento de telemetria emitindo uma mensagem PUBLISH para um tópico MQTT no seguinte formato:

/devices/DEVICE_ID/events

O ID do dispositivo é o ID da string do dispositivo especificado no ID do cliente MQTT. O ID do dispositivo diferencia maiúsculas de minúsculas.

As mensagens publicadas neste tópico do MQTT são encaminhadas para o tópico de telemetria padrão do registro correspondente. O tópico de telemetria padrão é o tópico do Cloud Pub/Sub especificado no campo eventNotificationConfigs[i].pubsubTopicName no recurso de registro. Se não existir um tópico padrão do Pub/Sub, os dados de telemetria publicados serão perdidos. Para publicar mensagens em outros tópicos do Cloud Pub/Sub, consulte Como publicar eventos de telemetria para tópicos adicionais do Cloud Pub/Sub.

O campo de dados de mensagem encaminhada contém uma cópia da mensagem publicada pelo dispositivo, e os seguintes atributos de mensagem são adicionados a cada mensagem no tópico do Cloud Pub/Sub:

Atributo Descrição
deviceId O identificador de string definido pelo usuário para o dispositivo, por exemplo, thing1. O ID do dispositivo precisa ser exclusivo no registro.
deviceNumId O código numérico gerado pelo servidor do dispositivo. Quando você cria um dispositivo, o Cloud IoT Core gera automaticamente o ID numérico do dispositivo. ele é globalmente exclusivo e não editável.
deviceRegistryLocation A região do registro do dispositivo do Google Cloud Platform, por exemplo, us-central1.
deviceRegistryId O identificador de string definido pelo usuário para o registro do dispositivo, por exemplo, registry1.
projectId O ID da string do projeto do Cloud proprietário do registro e do dispositivo.
subFolder A subpasta pode ser usada como uma categoria ou classificação de evento. Para clientes MQTT, a subpasta é o subtópico depois de DEVICE_ID/events, que é copiado diretamente. Por exemplo, se o cliente publicar no tópico MQTT /devices/DEVICE_ID/events/alerts, a subpasta será a string alerts.

O exemplo a seguir mostra como enviar mensagens PUBLISH por meio da conexão MQTT:

C++

Neste exemplo, usamos a biblioteca de cliente de APIs do Google para Python.
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = new DateTime();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

Neste exemplo, usamos a biblioteca de cliente de APIs do Google para Python.
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = "/devices/{}/{}".format(args.device_id, sub_topic)

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print("Waiting for {} before reconnecting.".format(delay))
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = "{}/{}-payload-{}".format(args.registry_id, args.device_id, i)
    print("Publishing message {}/{}: '{}'".format(i, args.num_messages, payload))
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

Como publicar eventos de telemetria em tópicos adicionais do Cloud Pub/Sub

Os dispositivos podem publicar dados em tópicos adicionais do Cloud Pub/Sub. Por padrão, as mensagens MQTT publicadas em /devices/DEVICE_ID/events são encaminhadas para o tópico de telemetria padrão do registro correspondente. É possível especificar uma subpasta no tópico MQTT para encaminhar dados para outros tópicos do Cloud Pub/Sub. A subpasta é o subtópico após /devices/DEVICE_ID/events.

As mensagens publicadas em uma subpasta são encaminhadas para o tópico do Cloud Pub/Sub com o mesmo nome. O registro correspondente precisa ser configurado com o tópico do Cloud Pub/Sub. Caso contrário, as mensagens serão encaminhadas para o tópico padrão do Cloud Pub/Sub.

As mensagens são encaminhadas para o tópico padrão do Cloud Pub/Sub, em vez do tópico adicional do Cloud Pub/Sub nos seguintes casos:

  • Nenhuma subpasta foi especificada no tópico MQTT
  • Uma subpasta é especificada no tópico MQTT, mas não há um tópico correspondente do Pub/Sub no registro do dispositivo

Por exemplo, se o dispositivo publicar no tópico MQTT /devices/DEVICE_ID/events/alerts, a subpasta será a string alerts. As mensagens serão encaminhadas para o tópico extra do Cloud Pub/Sub se os campos eventNotificationConfigs[i].subfolderMatches e eventNotificationConfigs[i].pubsubTopicName estiverem definidos alerts. Caso contrário, as mensagens serão encaminhadas para o tópico padrão do Cloud Pub/Sub.

Configuração do estado do dispositivo

Os dispositivos conectados podem relatar o estado do dispositivo emitindo uma mensagem PUBLISH para o seguinte tópico MQTT:

/devices/DEVICE_ID/state

Para categorizar e recuperar mensagens de estado, configure o registro com um tópico de estado do dispositivo. O tópico de estado do dispositivo é o tópico do Cloud Pub/Sub especificado no campo StateNotificationConfig.pubsubTopicName. Se o registro estiver configurado com um tópico de estado do dispositivo, essas mensagens serão encaminhadas para o tópico do Cloud Pub/Sub correspondente da melhor maneira possível.

Para mais detalhes sobre como recuperar mensagens de estado, consulte Como saber o estado do dispositivo.

Como limitar o tráfego MQTT

O Cloud IoT Core limita os projetos que geram carga excessiva. Quando os dispositivos repetem operações com falha sem esperar, é possível acionar os limites que afetam todos os dispositivos no mesmo projeto do Google Cloud.

Para novas tentativas, é altamente recomendável implementar um algoritmo de espera exponencial truncada com instabilidade introduzida.

Keep-alive

Ao enviar a mensagem CONNECT MQTT inicial de um cliente, é possível fornecer um valor de "sinal de atividade" opcional. Esse valor é um intervalo de tempo, medido em segundos, em que o agente espera que um cliente envie uma mensagem, como uma mensagem PUBLISH. Se nenhuma mensagem for enviada do cliente ao agente durante o intervalo, ele fechará automaticamente a conexão. Observe que o valor do sinal de atividade especificado é multiplicado por 1,5.Portanto, definir um sinal de atividade de 10 minutos realmente resulta em um intervalo de 15 minutos.

Para mais informações, consulte a especificação MQTT.

Configurações do cliente

O Cloud IoT Core não fornece o próprio valor de sinal de atividade padrão. Se você optar por especificar um intervalo de sinal de atividade, precisará configurá-lo no cliente.

Para melhores resultados, defina o intervalo do sinal de atividade do cliente para um mínimo de 60 segundos. Muitas bibliotecas de cliente de código aberto, incluindo as bibliotecas do Paho MQTT para C, Python, Node.js (links em inglês) e Java, use 60 segundos por padrão.

Limite de tempo de inatividade

Separado do intervalo de sinal de atividade, o Cloud IoT Core tem seu próprio limite de tempo de inatividade de 20 minutos. Com base nesse limite, uma conexão de cliente será encerrada automaticamente se o cliente não enviar nenhuma mensagem por 20 minutos, mesmo que o intervalo do sinal de atividade seja maior. Se um valor de sinal de atividade não for fornecido, o tempo limite padrão de inatividade de 20 minutos ainda entrará em vigor.

Solução de problemas

Se você tiver problemas para se conectar, consulte Solução de problemas.