Script demo MQTT del gateway

Questa pagina fornisce uno script Python che mostra il funzionamento dei gateway. Utilizzando gli esempi di Cloud IoT Core, lo script crea innanzitutto un registro demo, un gateway e un dispositivo. Quindi, associa il dispositivo al gateway, ascolta i messaggi di configurazione e invia dati sullo stato per conto del dispositivo. Infine, lo script viene automaticamente annullato, il dispositivo e il gateway vengono eliminati e viene cancellato il registro.

Funzionalità

Lo script illustra le seguenti funzionalità del gateway:

  • Creazione di un gateway
  • Creazione di un dispositivo da associare al gateway
  • Associazione del dispositivo al gateway
  • In ascolto di messaggi di configurazione
  • Invio di dati sullo stato
  • Annullamento dell'associazione del dispositivo dal gateway
  • Eliminazione del dispositivo e eliminazione del gateway in corso

Obiettivi

Una volta eseguito questo script, comprenderai il codice che esegue le seguenti operazioni:

  • Creazione di un registro
  • Creazione di un gateway e di un dispositivo da associare
  • Associazione del gateway al dispositivo
  • In ascolto di messaggi di configurazione

Prezzi

La quantità di dati trasmessi in questa demo rientra nel livello di utilizzo gratuito. Per ulteriori informazioni, vedi Prezzi.

Prima di iniziare

Questa demo presuppone che tu abbia una familiarità con Python e di aver esaminato la panoramica dei gateway.

Prima di eseguire questo esempio, devi ricevere un ID progetto, lo strumento a riga di comando gcloud e controllare che la fatturazione sia abilitata:

  1. Crea un nuovo progetto Cloud Console o recupera l'ID di un progetto esistente da Google Cloud Console:

    Vai alla pagina Progetti

  2. Installa e inizializza Google Cloud SDK:

    Scarica l'SDK

  3. Verifica che la fatturazione sia attivata per il tuo progetto.

    Scopri come attivare la fatturazione

Scarica lo script

Scarica lo script demo e imposta la directory corrente:

git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd python-docs-samples/iot/api-client/mqtt_example

Installazione delle dipendenze

Dalla directory iot/api-client/mqtt_example, installa le dipendenze necessarie per eseguire l'esempio:

pip install -r requirements.txt --user

Per ulteriori informazioni sulla configurazione dell'ambiente di sviluppo di Python, ad esempio l'installazione di tup sul sistema, consulta la Guida alla configurazione dell'ambiente di sviluppo di Python.

Crea le tue credenziali

Prima di eseguire l'esempio, completa la seguente procedura:

  1. Crea un account di servizio denominato gateway-demo e fai clic su Crea.
  2. Seleziona il ruolo Editor progetto.
  3. Ignora la schermata facoltativa Autorizzazioni account di servizio facendo clic su Continua.
  4. Fai clic su Crea chiave.
  5. Nel riquadro Crea chiave, in Tipo di chiave,seleziona JSON.
  6. Fai clic su Crea.
  7. Salva la chiave nella directory iot/api-client/mqtt_example e rinominala service_account.json.
  8. Scarica il certificato radice CA di Google nella stessa directory dei file di esempio. Facoltativamente, puoi impostare la posizione del certificato con il flag --ca_certs.

Aggiungi l'ID progetto e le credenziali allo script

  1. Nello script, imposta la variabile di ambiente GOOGLE_CLOUD_PROJECT sull'ID progetto.
  2. Imposta la variabile di ambiente GOOGLE_APPLICATION_CREDENTIALS su service_account.json.

Esegui lo script localmente

Nella sottodirectory del progetto python-docs-samples/iot/api-client/mqtt_example, esegui lo script richiamando il comando:

python gateway_demo.py

Quando lo script viene eseguito, scrive l'output nel terminale.

procedura dettagliata gateway gateway_demo.py

Questa sezione descrive ciò che accade in ogni passaggio dello script.

Innanzitutto, lo script crea un registro temporaneo del dispositivo, che conterrà sia il gateway sia il dispositivo che utilizza il gateway per comunicare con Cloud IoT Core:

Python

print("Creating registry: {}".format(registry_id))
manager.create_registry(
    service_account_json, project_id, cloud_region, pubsub_topic, registry_id
)

Il seguente codice nell'esempio di gestione del registro viene chiamato dallo script:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# pubsub_topic = 'your-pubsub-topic'
# registry_id = 'your-registry-id'
client = iot_v1.DeviceManagerClient()
parent = f"projects/{project_id}/locations/{cloud_region}"

if not pubsub_topic.startswith("projects/"):
    pubsub_topic = "projects/{}/topics/{}".format(project_id, pubsub_topic)

body = {
    "event_notification_configs": [{"pubsub_topic_name": pubsub_topic}],
    "id": registry_id,
}

try:
    response = client.create_device_registry(
        request={"parent": parent, "device_registry": body}
    )
    print("Created registry")
    return response
except HttpError:
    print("Error, registry not created")
    raise
except AlreadyExists:
    print("Error, registry already exists")
    raise

Successivamente, lo script crea un gateway e lo aggiunge al registro:

Python

print("Creating gateway: {}".format(gateway_id))
manager.create_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    None,
    gateway_id,
    rsa_cert_path,
    "RS256",
)

Il seguente codice nell'esempio di gestione dispositivi viene chiamato dallo script:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
# certificate_file = 'path/to/certificate.pem'
# algorithm = 'ES256'
# Check that the gateway doesn't already exist
exists = False
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)
devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == gateway_id:
        exists = True
    print(
        "Device: {} : {} : {} : {}".format(
            device.id, device.num_id, device.config, device.gateway_config
        )
    )

with io.open(certificate_file) as f:
    certificate = f.read()

if algorithm == "ES256":
    certificate_format = iot_v1.PublicKeyFormat.ES256_PEM
else:
    certificate_format = iot_v1.PublicKeyFormat.RSA_X509_PEM

# TODO: Auth type
device_template = {
    "id": gateway_id,
    "credentials": [
        {"public_key": {"format": certificate_format, "key": certificate}}
    ],
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Gateway {}".format(res))
else:
    print("Gateway exists, skipping")

Ora lo script crea un dispositivo nello stesso registro del gateway.

Python

print("Creating device to bind: {}".format(device_id))
manager.create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

Il seguente codice nell'esempio di Gestione dispositivi viene chiamato dallo script:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'

# Check that the device doesn't already exist
client = iot_v1.DeviceManagerClient()

exists = False

parent = client.registry_path(project_id, cloud_region, registry_id)

devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == device_id:
        exists = True

# Create the device
device_template = {
    "id": device_id,
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.NON_GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Device {}".format(res))
else:
    print("Device exists, skipping")

Dopo che il dispositivo e il gateway si trovano nel Registro demo, lo script può associare il dispositivo al gateway. L'associazione del dispositivo consente di collegare e scollegare il dispositivo tramite la connessione del gateway al bridge del protocollo Cloud IoT Core.

Python

print("Binding device")
manager.bind_device_to_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)

Il seguente codice nell'esempio di Gestione dispositivi viene chiamato dallo script:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.bind_device_to_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device Bound! {}".format(res))

Dopo che il dispositivo è associato al gateway, lo script viene messo in pausa in modo che tu possa impostare i dati di configurazione per il gateway o il dispositivo associato.

Per impostare i dati di configurazione:

  1. Vai a Google Cloud Console.
  2. Imposta i dati di configurazione per il gateway o il dispositivo associato.
  3. Prima di iniziare ad ascoltare, il gateway e il dispositivo ricevono la configurazione più recente.

Dopo aver impostato i dati di configurazione e prosegui con lo script, il gateway si connette al bridge di protocollo Cloud IoT Core, allega il dispositivo e riceve i messaggi di configurazione per il dispositivo.

Per ascoltare i messaggi di configurazione, lo script chiama una funzione helper dal campione MQTT come segue:

Python

print("Listening for messages for {} seconds".format(listen_time))
print("Try setting configuration in: ")
print("\t{}".format(edit_template.format(registry_id, project_id)))
try:
    input("Press enter to continue")
except SyntaxError:
    pass

def log_callback(client):
    def log_on_message(unused_client, unused_userdata, message):
        if not os.path.exists(log_path):
            with open(log_path, "w") as csvfile:
                logwriter = csv.writer(csvfile, dialect="excel")
                logwriter.writerow(["time", "topic", "data"])

        with open(log_path, "a") as csvfile:
            logwriter = csv.writer(csvfile, dialect="excel")
            logwriter.writerow(
                [
                    datetime.datetime.now().isoformat(),
                    message.topic,
                    message.payload,
                ]
            )

    client.on_message = log_on_message

cloudiot_mqtt_example.listen_for_messages(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    listen_time,
    log_callback,
)

Il seguente codice nell'esempio di MQTT viene chiamato dallo script:

Python

global minimum_backoff_time

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.utcnow()
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Durante l'esecuzione dello script, i messaggi di configurazione vengono scritti in un log che può essere aggiunto alla coda per trasmettere le configurazioni ai dispositivi collegati.

Dopo aver dimostrato in che modo puoi ricevere messaggi di configurazione per conto di un gateway e del dispositivo associato, lo script invia dati sullo stato per conto del dispositivo associato. Per eseguire questa operazione, lo script chiama una funzione helper dal campione MQTT:

Python

print("Publishing messages demo")
print("Publishing: {} messages".format(num_messages))
cloudiot_mqtt_example.send_data_from_bound_device(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    "Hello from gateway_demo.py",
)

print("You can read the state messages for your device at this URL:")
print("\t{}".format(device_url_template).format(registry_id, device_id, project_id))
try:
    input("Press enter to continue after reading the messages.")
except SyntaxError:
    pass

Il seguente codice nell'esempio di MQTT viene chiamato dallo script:

Python

global minimum_backoff_time

# Publish device events and gateway state.
device_topic = "/devices/{}/{}".format(device_id, "state")
gateway_topic = "/devices/{}/{}".format(gateway_id, "state")

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# Publish state to gateway topic
gateway_state = "Starting gateway at: {}".format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state)

# Publish num_messages messages to the MQTT bridge
for i in range(1, num_messages + 1):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    payload = "{}/{}-{}-payload-{}".format(registry_id, gateway_id, device_id, i)

    print(
        "Publishing message {}/{}: '{}' to {}".format(
            i, num_messages, payload, device_topic
        )
    )
    client.publish(device_topic, "{} : {}".format(device_id, payload))

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s").format(seconds_since_issue)
        jwt_iat = datetime.datetime.utcnow()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(5)

detach_device(client, device_id)

print("Finished.")

Lo script mette in pausa dopo la trasmissione dei dati sullo stato per conto del dispositivo associato, pertanto puoi accedere a Cloud Console per visualizzare i dati sullo stato prima che il Registro demo, il gateway e il dispositivo vengano rimossi nel passaggio successivo.

Al termine dello script, libera il gateway, il dispositivo e il registro allocate all'inizio.

Python

# Clean up
manager.unbind_device_from_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, gateway_id
)
manager.delete_registry(service_account_json, project_id, cloud_region, registry_id)

Il dispositivo è svincolato dal gateway prima dell'eliminazione:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.unbind_device_from_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device unbound: {}".format(res))

Lo script chiama una funzione helper dal campione di gestore di dispositivi per eliminare sia il dispositivo associato sia il gateway:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
print("Delete device")
client = iot_v1.DeviceManagerClient()

device_path = client.device_path(project_id, cloud_region, registry_id, device_id)

return client.delete_device(request={"name": device_path})

Infine, lo script chiama una funzione helper dal esempio di gestione del registro per eliminare il registro:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
print("Delete registry")

client = iot_v1.DeviceManagerClient()
registry_path = client.registry_path(project_id, cloud_region, registry_id)

try:
    client.delete_device_registry(request={"name": registry_path})
    print("Deleted registry")
    return "Registry deleted"
except HttpError:
    print("Error, registry not deleted")
    raise