Demo-Skript für MQTT-Gateway

Auf dieser Seite finden Sie ein Python-Skript, das die Funktionsweise von Gateways veranschaulicht. Mithilfe von Cloud IoT Core-Beispielen erstellt das Skript zuerst eine Demo-Registry, ein Gateway und ein Gerät. Anschließend wird das Gerät an das Gateway gebunden, es werden Konfigurationsnachrichten überwacht und es werden Statusdaten im Namen des Geräts gesendet. Abschließend wird die Verknüpfung des Geräts automatisch aufgehoben, das Gerät und das Gateway werden gelöscht und die Registry wird bereinigt.

Features

Das Skript veranschaulicht die folgenden Gateway-Funktionen:

  • Gateway erstellen
  • Gerät erstellen, das an das Gateway gebunden wird
  • Gerät an das Gateway binden
  • Konfigurationsnachrichten überwachen
  • Statusdaten senden
  • Bindung des Geräts an das Gateway aufheben
  • Gerät und dann Gateway löschen

Lernziele

Nachdem Sie dieses Skript ausgeführt haben, verstehen Sie den Code, der Folgendes ausführt:

  • Registry erstellen
  • Gateway und Gerät zum Binden erstellen
  • Gateway an das Gerät binden
  • Konfigurationsnachrichten überwachen

Preise

Die in dieser Demo übertragenen Datenmenge gehört zur kostenlosen Nutzungsstufe. Weitere Informationen finden Sie unter Preise.

Hinweis

In dieser Demonstration wird davon ausgegangen, dass Sie mit Python vertraut sind und die Übersicht zu Gateways gelesen haben.

Bevor Sie dieses Beispiel ausführen können, müssen Sie eine Projekt-ID und die Google Cloud-Befehlszeile abrufen und prüfen, ob die Abrechnung aktiviert ist:

  1. Erstellen Sie ein neues Konsolenprojekt oder rufen Sie die Projekt-ID eines vorhandenen Projekts in der Google Cloud Console ab:

    Zur Seite „Projekte“

  2. Installieren Sie die Google Cloud CLI und initialisieren Sie sie:

    SDK herunterladen

  3. Die Abrechnung für Ihr Projekt muss aktiviert sein.

    Weitere Informationen zum Aktivieren der Abrechnung

Skript herunterladen

Laden Sie das Demoskript herunter und legen Sie Ihr aktuelles Verzeichnis fest:

git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd python-docs-samples/iot/api-client/mqtt_example

Abhängigkeiten installieren

Installieren Sie im Verzeichnis iot/api-client/mqtt_example die Abhängigkeiten, die zum Ausführen des Beispiels erforderlich sind:

pip install -r requirements.txt --user

Weitere Informationen zur Einrichtung der Python-Entwicklungsumgebung, z. B. zur Installation von "pip" auf Ihrem System, finden Sie im Einrichtungshandbuch für die Python-Entwicklungsumgebung.

Anmeldedaten erstellen

Führen Sie vor dem Ausführen des Beispiels die folgenden Schritte aus:

  1. Erstellen Sie ein Dienstkonto:

    1. Rufen Sie in der Console die Seite Dienstkonten auf.

      Zu den Dienstkonten

    2. Wählen Sie Ihr Projekt aus.

    3. Klicken Sie auf Dienstkonto erstellen.

    4. Geben Sie im Feld Dienstkontoname einen Namen ein. Die Konsole füllt das Feld Dienstkonto-ID anhand dieses Namens aus.

    5. Optional: Im Feld Beschreibung des Dienstkontos können Sie eine entsprechende Beschreibung eingeben.

    6. Klicken Sie auf Erstellen und fortfahren.

    7. Klicke auf das Feld Rolle auswählen und wähle Einfach > aus.

    8. Klicken Sie auf Fertig, um das Erstellen des Dienstkontos abzuschließen.

      Schließen Sie das Browserfenster nicht. Sie verwenden es in der nächsten Aufgabe.

  2. Laden Sie einen JSON-Schlüssel für das gerade erstellte Dienstkonto herunter:

    1. Klicken Sie in der Konsole auf die E-Mail-Adresse des von Ihnen erstellten Dienstkontos.
    2. Klicken Sie auf Schlüssel.
    3. Klicken Sie auf Schlüssel hinzufügen > Neuen Schlüssel erstellen.
    4. Klicken Sie auf Erstellen. Daraufhin wird eine JSON-Schlüsseldatei auf Ihren Computer heruntergeladen.

      Speichern Sie diesen Schlüssel im Verzeichnis iot/api-client/mqtt_example und benennen Sie ihn in service_account.json um.

    5. Klicken Sie auf Close (Schließen).

  3. Laden Sie das Root-Zertifikat von Google in das Verzeichnis herunter, in dem sich auch die Beispieldateien befinden. Optional können Sie den Speicherort des Zertifikats mit dem Flag --ca_certs festlegen.

Projekt-ID und Anmeldedaten zum Skript hinzufügen

  1. Legen Sie im Skript die Umgebungsvariable GOOGLE_CLOUD_PROJECT auf Ihre Projekt-ID fest.
  2. Legen Sie die Umgebungsvariable GOOGLE_APPLICATION_CREDENTIALS auf service_account.json fest.

Skript lokal ausführen

Führen Sie das Skript im Projektunterverzeichnis python-docs-samples/iot/api-client/mqtt_example mit dem folgenden Befehl aus.

python gateway_demo.py

Wenn das Skript ausgeführt wird, wird die Ausgabe in das Terminal geschrieben.

Schritt-für-Schritt-Anleitung zu gateway-demo.py

In diesem Abschnitt wird beschrieben, was in jedem Schritt des Skripts geschieht.

Zuerst erstellt das Skript eine temporäre Geräte-Registry, die sowohl das Gateway als auch das Gerät enthält, das das Gateway für die Kommunikation mit Cloud IoT Core verwendet:

Python

print("Creating registry: {}".format(registry_id))
manager.create_registry(
    service_account_json, project_id, cloud_region, pubsub_topic, registry_id
)

Der folgende Code im Beispiel zur Registry-Verwaltung wird vom Skript aufgerufen:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# pubsub_topic = 'your-pubsub-topic'
# registry_id = 'your-registry-id'
client = iot_v1.DeviceManagerClient()
parent = f"projects/{project_id}/locations/{cloud_region}"

if not pubsub_topic.startswith("projects/"):
    pubsub_topic = "projects/{}/topics/{}".format(project_id, pubsub_topic)

body = {
    "event_notification_configs": [{"pubsub_topic_name": pubsub_topic}],
    "id": registry_id,
}

try:
    response = client.create_device_registry(
        request={"parent": parent, "device_registry": body}
    )
    print("Created registry")
    return response
except HttpError:
    print("Error, registry not created")
    raise
except AlreadyExists:
    print("Error, registry already exists")
    raise

Dann erstellt das Skript ein Gateway und fügt es der Registry hinzu:

Python

print("Creating gateway: {}".format(gateway_id))
manager.create_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    None,
    gateway_id,
    rsa_cert_path,
    "RS256",
)

Der folgende Code im Beispiel zur Geräteverwaltung wird vom Skript aufgerufen:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
# certificate_file = 'path/to/certificate.pem'
# algorithm = 'ES256'
# Check that the gateway doesn't already exist
exists = False
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)
devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == gateway_id:
        exists = True
    print(
        "Device: {} : {} : {} : {}".format(
            device.id, device.num_id, device.config, device.gateway_config
        )
    )

with io.open(certificate_file) as f:
    certificate = f.read()

if algorithm == "ES256":
    certificate_format = iot_v1.PublicKeyFormat.ES256_PEM
else:
    certificate_format = iot_v1.PublicKeyFormat.RSA_X509_PEM

# TODO: Auth type
device_template = {
    "id": gateway_id,
    "credentials": [
        {"public_key": {"format": certificate_format, "key": certificate}}
    ],
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Gateway {}".format(res))
else:
    print("Gateway exists, skipping")

Jetzt erstellt das Skript ein Gerät in derselben Registry wie das Gateway:

Python

print("Creating device to bind: {}".format(device_id))
manager.create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

Der folgende Code im Gerätemanager-Beispiel wird vom Skript aufgerufen:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'

# Check that the device doesn't already exist
client = iot_v1.DeviceManagerClient()

exists = False

parent = client.registry_path(project_id, cloud_region, registry_id)

devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == device_id:
        exists = True

# Create the device
device_template = {
    "id": device_id,
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.NON_GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Device {}".format(res))
else:
    print("Device exists, skipping")

Sobald sich das Gerät und das Gateway in der Demo-Registry befinden, kann das Skript das Gerät an das Gateway binden. Durch das Binden des Geräts kann das Gateway das Gerät über die Gateway-Verbindung mit der Cloud IoT Core-Protokoll-Bridge verbinden und trennen.

Python

print("Binding device")
manager.bind_device_to_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)

Der folgende Code im Gerätemanager-Beispiel wird vom Skript aufgerufen:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.bind_device_to_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device Bound! {}".format(res))

Nachdem das Gerät an das Gateway gebunden wurde, wird das Skript angehalten, sodass Sie Konfigurationsdaten für das Gateway oder das gebundene Gerät festlegen können.

So legen Sie die Konfigurationsdaten fest:

  1. Rufen Sie die Google Cloud Console auf.
  2. Legen Sie Konfigurationsdaten für das Gateway oder das gebundene Gerät fest.
  3. Prüfen Sie, ob das Gateway und das Gerät die neueste Konfiguration erhalten, bevor das Gateway mit der Überwachung beginnt.

Nachdem Sie die Konfigurationsdaten festgelegt und das Skript fortgesetzt haben, stellt das Gateway eine Verbindung zur Cloud IoT Core-Protokoll-Bridge her, hängt das Gerät an und empfängt dann Konfigurationsnachrichten für das Gerät.

Um auf Konfigurationsnachrichten zu warten, ruft das Skript eine Hilfsfunktion aus dem MQTT-Beispiel auf:

Python

print("Listening for messages for {} seconds".format(listen_time))
print("Try setting configuration in: ")
print("\t{}".format(edit_template.format(registry_id, project_id)))
try:
    input("Press enter to continue")
except SyntaxError:
    pass

def log_callback(client):
    def log_on_message(unused_client, unused_userdata, message):
        if not os.path.exists(log_path):
            with open(log_path, "w") as csvfile:
                logwriter = csv.writer(csvfile, dialect="excel")
                logwriter.writerow(["time", "topic", "data"])

        with open(log_path, "a") as csvfile:
            logwriter = csv.writer(csvfile, dialect="excel")
            logwriter.writerow(
                [
                    datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
                    message.topic,
                    message.payload,
                ]
            )

    client.on_message = log_on_message

cloudiot_mqtt_example.listen_for_messages(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    listen_time,
    log_callback,
)

Der folgende Code im MQTT-Beispiel wird durch das Skript aufgerufen:

Python

global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Wenn das Skript ausgeführt wird, schreibt es die Konfigurationsnachrichten in ein Log, das verfolgt werden kann, um die Konfigurationen an die angehängten Geräte zu übertragen.

Nachdem veranschaulicht wurde, wie Sie Konfigurationsnachrichten im Namen eines Gateways und des verbundenen Geräts empfangen können, sendet das Skript Statusdaten im Namen des gebundenen Geräts. Dazu ruft das Skript eine Hilfsfunktion aus dem MQTT-Beispiel auf:

Python

print("Publishing messages demo")
print("Publishing: {} messages".format(num_messages))
cloudiot_mqtt_example.send_data_from_bound_device(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    "Hello from gateway_demo.py",
)

print("You can read the state messages for your device at this URL:")
print("\t{}".format(device_url_template).format(registry_id, device_id, project_id))
try:
    input("Press enter to continue after reading the messages.")
except SyntaxError:
    pass

Der folgende Code im MQTT-Beispiel wird durch das Skript aufgerufen:

Python

global minimum_backoff_time

# Publish device events and gateway state.
device_topic = "/devices/{}/{}".format(device_id, "state")
gateway_topic = "/devices/{}/{}".format(gateway_id, "state")

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# Publish state to gateway topic
gateway_state = "Starting gateway at: {}".format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state)

# Publish num_messages messages to the MQTT bridge
for i in range(1, num_messages + 1):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    payload = "{}/{}-{}-payload-{}".format(registry_id, gateway_id, device_id, i)

    print(
        "Publishing message {}/{}: '{}' to {}".format(
            i, num_messages, payload, device_topic
        )
    )
    client.publish(device_topic, "{} : {}".format(device_id, payload))

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s").format(seconds_since_issue)
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(5)

detach_device(client, device_id)

print("Finished.")

Das Skript wird nach der Übertragung von Statusdaten im Namen des gebundenen Geräts pausiert. So können Sie in der Konsole die Statusdaten aufrufen, bevor die Demoregistrierung, das Gateway und das Gerät im nächsten Schritt entfernt werden.

Nach Beendigung des Skripts werden das Gateway, das Gerät und die Registry, die zu Beginn zugewiesen wurden, freigegeben.

Python

# Clean up
manager.unbind_device_from_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, gateway_id
)
manager.delete_registry(service_account_json, project_id, cloud_region, registry_id)

Die Bindung des Geräts an das Gateway wird aufgehoben, bevor es gelöscht wird:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.unbind_device_from_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device unbound: {}".format(res))

Das Skript ruft eine Hilfsfunktion aus dem Gerätemanager-Beispiel auf, um sowohl das gebundene Gerät als auch das Gateway zu löschen:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
print("Delete device")
client = iot_v1.DeviceManagerClient()

device_path = client.device_path(project_id, cloud_region, registry_id, device_id)

return client.delete_device(request={"name": device_path})

Schließlich ruft das Skript eine Hilfsfunktion aus dem Beispiel für die Registry-Verwaltung auf, um die Registry zu löschen:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
print("Delete registry")

client = iot_v1.DeviceManagerClient()
registry_path = client.registry_path(project_id, cloud_region, registry_id)

try:
    client.delete_device_registry(request={"name": registry_path})
    print("Deleted registry")
    return "Registry deleted"
except HttpError:
    print("Error, registry not deleted")
    raise