Google Cloud IoT Core verrà ritirato il 16 agosto 2023. Per saperne di più, contatta il team dedicato al tuo account Google Cloud.

Utilizzo dei gateway con il bridge MQTT

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

In questa pagina viene spiegato in che modo i gateway possono utilizzare il bridge MQTT per comunicare con Cloud IoT Core e pubblicare eventi di telemetria per conto di dispositivi associati. Prima di iniziare, leggi Utilizzo del bridge MQTT per informazioni generali sull'utilizzo del bridge MQTT con Cloud IoT Core.

Prova una demo end-to-end

Utilizzo dei gateway con il bridge MQTT

  1. Dopo aver creato e configurato il gateway, connettilo a Cloud IoT Core tramite il bridge MQTT.
  2. Crea dispositivi se non l'hai già fatto.
  3. (Facoltativo) Associa i dispositivi al gateway.

    Quando utilizzi il bridge MQTT, devi associare i dispositivi solo se non possono generare i loro JWT.

  4. (Facoltativo) Iscriviti all'argomento dell'errore di sistema per ricevere un feedback sull'esito positivo o meno delle operazioni del dispositivo.

  5. Collega i dispositivi al gateway.

  6. Utilizza il gateway per inoltrare telemetria, stato del dispositivo e messaggi di configurazione per conto dei suoi dispositivi. Prova la demo end-to-end per scoprire come.

Messaggi dei gateway

Dopo che il gateway si connette a Cloud IoT Core tramite il bridge MQTT, può inviare o ricevere tre tipi di messaggi:

  • Controlla i messaggi: collega un dispositivo al gateway o scollega un dispositivo dal gateway. Questi messaggi vengono inviati tra il gateway e Cloud IoT Core. Cloud IoT Core accetta messaggi di controllo solo da gateway; se un altro tipo di dispositivo tenta di inviare un messaggio di controllo, Cloud IoT Core chiude la connessione.
  • Messaggi da gateway e dispositivi:possono essere inoltrati dal gateway per conto di un dispositivo o inviati direttamente dal gateway stesso.
  • Messaggi di errore di sistema: quando il gateway è iscritto all'argomento di errore di sistema MQTT per conto del dispositivo, Cloud IoT Core invia messaggi di errore al gateway ogni volta che il dispositivo rileva un errore.

Collegare i dispositivi a un gateway

Per abilitare il gateway per il proxy delle comunicazioni tra dispositivi con Cloud IoT Core, chiedi al gateway di pubblicare un messaggio di controllo /devices/{device_ID_to_attach}/attach per QoS sul bridge MQTT.

Se hai configurato il gateway per l'autenticazione dei dispositivi utilizzando i JWT dei dispositivi, il payload del messaggio allegato dovrà includere il token nel formato JSON: { "authorization" : "{JWT_token}" }. In caso contrario, Cloud IoT Core autentica il dispositivo controllando la propria associazione con il gateway.

Risposta riuscita

Dopo l'autorizzazione del dispositivo, Cloud IoT Core invia un messaggio PUBACK al gateway in risposta al messaggio di collegamento. Dopo aver ricevuto il messaggio PUBACK, il gateway può pubblicare e iscriversi agli argomenti di Cloud IoT Core per conto del dispositivo, ad esempio messaggi di telemetria o configurazione.

Se un dispositivo è già collegato quando il gateway invia il messaggio allegato, Cloud IoT Core risponde con un messaggio PUBACK.

Scollegamento di dispositivi dal gateway

Per scollegare un dispositivo dal gateway, chiedi al gateway di pubblicare un messaggio di controllo QoS 1 /devices/{device_ID}/detach sul bridge MQTT. Se il dispositivo non è collegato al momento dell'invio del messaggio, Cloud IoT Core ignora il messaggio di scollegamento e invia un messaggio PUBACK.

Risolvere i problemi

Per ricevere una notifica quando un dispositivo rileva un errore, abbonati il gateway all'argomento MQTT /devices/{gateway_ID}/errors utilizzando il livello QoS 0:

Go

Il passaggio in cui il dispositivo si registra all'argomento errors è evidenziato di seguito:

import (
	"fmt"
	"io"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {

	const (
		mqttBrokerURL   = "tls://mqtt.googleapis.com:8883"
		protocolVersion = 4 // corresponds to MQTT 3.1.1
	)

	// onConnect defines the on connect handler which resets backoff variables.
	var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
		fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
	}

	// onMessage defines the message handler for the mqtt client.
	var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
		fmt.Fprintf(w, "Message: %s\n", msg.Payload())
	}

	// onDisconnect defines the connection lost handler for the mqtt client.
	var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
		fmt.Println("Client disconnected")
	}

	jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
	clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(mqttBrokerURL)
	opts.SetClientID(clientID)
	opts.SetUsername("unused")
	opts.SetPassword(jwt)
	opts.SetProtocolVersion(protocolVersion)
	opts.SetOnConnectHandler(onConnect)
	opts.SetDefaultPublishHandler(onMessage)
	opts.SetConnectionLostHandler(onDisconnect)

	// Create and connect a client using the above options.
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to connect client")
		return token.Error()
	}

	if err := attachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "AttachDevice error: %v\n", err)
		return err
	}

	// Sleep for 5 seconds to allow attachDevice message to propagate.
	time.Sleep(5 * time.Second)

	// Subscribe to the config topic of the current gateway and a device bound to the gateway.
	gatewayTopic := fmt.Sprintf("/devices/%s/%s", gatewayID, topic)
	if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	deviceTopic := fmt.Sprintf("/devices/%s/%s", deviceID, topic)
	if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	time.Sleep(time.Duration(clientDuration) * time.Second)

	if err := detachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "DetachDevice error: %v\n", err)
		return err
	}

	if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	client.Disconnect(10)
	return nil
}

Java

Il passaggio in cui il dispositivo si registra all'argomento errors è evidenziato di seguito:

// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, gatewayId);

// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));

client.subscribe(errorTopic, 0);

return client;

Node.js

Ad esempio, il passaggio in cui il dispositivo si abbona all'argomento config è evidenziato di seguito. Per iscriverti all'argomento errors, specifica /devices/${gateway_ID}/errors
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  qos: 1,
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on('connect', success => {
  if (!success) {
    console.log('Client not connected...');
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, {qos: 0});

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log('Closing connection to MQTT. Goodbye!');
        client.end(true);
      }, clientDuration); // Safely detach device and close connection.
    }, 5000);
  }
});

client.on('close', () => {
  console.log('Connection closed');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  const decodedMessage = Buffer.from(message, 'base64').toString('ascii');

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

Python

Il passaggio in cui il dispositivo si registra all'argomento errors è evidenziato di seguito:
global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Cloud IoT Core invia gli errori del gateway in base al meglio delle prestazioni, in QoS 0. Se il gateway non è iscritto a /devices/{gateway_ID}/errors, Cloud IoT Core registra gli eventi di errore, ma non invia un messaggio PUBACK.

Gli errori MQTT hanno la seguente struttura:

string error_type;  // A string description of the error type.
string device_id;   // The ID of the device that caused the error.
string description; // A description of the error.

Se il messaggio di errore è stato attivato da un messaggio MQTT, verranno allegate anche le seguenti informazioni:

string message_type;  // The string MQTT message type.
string topic;  // The MQTT topic if applicable, otherwise it is empty.
int packet_id;  // The packet ID of the MQTT message if applicable, otherwise it is zero.

Codici di errore e gestione degli errori

Codice di errore Descrizione Azione consigliata
GATEWAY_ATTACHMENT_ERROR Richiesta di collegamento del gateway non riuscita. Non riprovare senza risolvere il problema.
GATEWAY_DEVICE_NOT_FOUND Il gateway non è riuscito a trovare un dispositivo collegato per gestire i messaggi in arrivo. Non riprovare senza risolvere il problema.
GATEWAY_INVALID_MQTT_TOPIC Il gateway non è riuscito ad analizzare l'argomento MQTT specificato, il quale non era formattato correttamente o conteneva un nome o un ID dispositivo non valido. Non riprovare senza risolvere il problema.
ID GATEWAY_UNexpected_PACKET_ID Il gateway non è riuscito a elaborare il messaggio in base all'ID pacchetto. Ad esempio, un PUBACK potrebbe contenere un ID pacchetto, ma non era in attesa di risposta. Non riprovare senza risolvere il problema.
GATEWAY_UNexpected_MESSAGE_TYPE Il gateway ha ricevuto messaggi imprevisti, come PUBREL, PUBREC e così via non sono supportati. Non riprovare senza risolvere il problema.
GATEWAY_DETACHMENT_DEVICE_ERROR Il gateway ha scollegato un dispositivo a causa di un errore del dispositivo. Non riprovare senza risolvere il problema.
SCONOSCIUTO L'errore è sconosciuto. Riprova utilizzando il backoff esponenziale.

Per ulteriori informazioni, consulta la documentazione principale sui messaggi di errore e la specifica MQTT versione 3.1.1.