Script de démonstration de la passerelle MQTT

Cette page fournit un script Python qui montre comment fonctionnent les passerelles. À l'aide d'exemples Cloud IoT Core, le script crée d'abord un registre de démonstration, une passerelle et un appareil. Elle associe ensuite l'appareil à la passerelle, écoute les messages de configuration et envoie des données d'état au nom de l'appareil. Enfin, le script dissocie automatiquement l'appareil, supprime l'appareil et la passerelle, puis efface le registre.

Fonctionnalités

Le script présente les fonctionnalités de passerelle suivantes :

  • Créer une passerelle
  • Créer un appareil à lier à la passerelle
  • Associer l'appareil à la passerelle
  • Écouter les messages de configuration
  • Données sur l'état d'envoi
  • Dissocier l'appareil de la passerelle
  • Supprimer l'appareil, puis la passerelle

Objectifs

Après avoir exécuté ce script, vous comprendrez le code qui effectue les opérations suivantes :

  • Créer un registre
  • Créer une passerelle et un appareil à lier
  • Associer la passerelle à l'appareil
  • Écouter les messages de configuration

Tarification

La quantité de données transmises dans cette démonstration est incluse dans la version gratuite. Reportez-vous à la page Tarifs pour plus de détails.

Avant de commencer

Cette démonstration suppose que vous connaissiez Python et que vous avez lu l'aperçu des passerelles.

Avant de pouvoir exécuter cet exemple, vous devez obtenir un ID de projet, obtenir l'interface de ligne de commande Google Cloud et vérifier que la facturation est activée:

  1. Créez un projet de console ou récupérez l'ID d'un projet existant à partir de Google Cloud Console:

    Accéder à la page "Projets"

  2. Installez et initialisez Google Cloud CLI :

    Télécharger le SDK

  3. Assurez-vous que la facturation est activée pour votre projet.

    Découvrir comment activer la facturation

Télécharger le script

Téléchargez le script de démonstration et définissez votre répertoire actuel :

git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd python-docs-samples/iot/api-client/mqtt_example

Installer des dépendances

Depuis le répertoire iot/api-client/mqtt_example, installez les dépendances nécessaires à l'exécution de l'exemple :

pip install -r requirements.txt --user

Pour savoir comment configurer votre environnement de développement Python et installer pip sur votre système, consultez le guide de configuration d'un environnement de développement Python.

Créer vos identifiants

Avant d'exécuter l'exemple, procédez comme suit :

  1. Créez un compte de service :

    1. Dans la console, accédez à la page Comptes de service.

      Accéder aux comptes de service

    2. Sélectionnez votre projet.

    3. Cliquez sur Créer un compte de service.

    4. Dans le champ Nom du compte de service, saisissez un nom. La console remplit le champ ID du compte de service en fonction de ce nom.

    5. Facultatif : dans le champ Description du compte de service, saisissez une description du compte de service.

    6. Cliquez sur Créer et continuer.

    7. Cliquez sur le champ Select a role (Sélectionner un rôle), puis sélectionnez Basic > Editor (Éditeur de base).

    8. Cliquez sur OK pour terminer la création du compte de service.

      Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

  2. Téléchargez une clé JSON pour le compte de service que vous venez de créer :

    1. Dans la console, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
    2. Cliquez sur Keys (Clés).
    3. Cliquez sur Add key (Ajouter une clé), puis sur Create new key (Créer une clé).
    4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.

      Enregistrez cette clé dans le répertoire iot/api-client/mqtt_example et renommez-la service_account.json.

    5. Cliquez sur Close (Fermer).

  3. Téléchargez le certificat CA racine de Google dans le même répertoire que les exemples de fichier. Vous pouvez également définir l'emplacement du certificat à l'aide de l'indicateur --ca_certs.

Ajouter l'ID du projet et les identifiants au script

  1. Dans le script, définissez la variable d'environnement GOOGLE_CLOUD_PROJECT sur l'ID de votre projet.
  2. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS sur service_account.json.

Exécuter le script en local

Dans le sous-répertoire du projet python-docs-samples/iot/api-client/mqtt_example, exécutez le script en appelant la commande suivante :

python gateway_demo.py

Lorsque le script s'exécute, il écrit le résultat dans le terminal.

Tutoriel gateway_demo.py

Cette section décrit ce qui se passe à chaque étape du script.

Tout d'abord, le script crée un registre temporaire d'appareils, qui contiendra à la fois la passerelle et l'appareil qui utilise la passerelle pour communiquer avec Cloud IoT Core :

Python

print("Creating registry: {}".format(registry_id))
manager.create_registry(
    service_account_json, project_id, cloud_region, pubsub_topic, registry_id
)

Le code suivant de l'exemple de gestion du registre est appelé par le script :

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# pubsub_topic = 'your-pubsub-topic'
# registry_id = 'your-registry-id'
client = iot_v1.DeviceManagerClient()
parent = f"projects/{project_id}/locations/{cloud_region}"

if not pubsub_topic.startswith("projects/"):
    pubsub_topic = "projects/{}/topics/{}".format(project_id, pubsub_topic)

body = {
    "event_notification_configs": [{"pubsub_topic_name": pubsub_topic}],
    "id": registry_id,
}

try:
    response = client.create_device_registry(
        request={"parent": parent, "device_registry": body}
    )
    print("Created registry")
    return response
except HttpError:
    print("Error, registry not created")
    raise
except AlreadyExists:
    print("Error, registry already exists")
    raise

Le script crée ensuite une passerelle et l'ajoute au registre :

Python

print("Creating gateway: {}".format(gateway_id))
manager.create_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    None,
    gateway_id,
    rsa_cert_path,
    "RS256",
)

Le code suivant dans l'exemple de gestion des appareils est appelé par le script :

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
# certificate_file = 'path/to/certificate.pem'
# algorithm = 'ES256'
# Check that the gateway doesn't already exist
exists = False
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)
devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == gateway_id:
        exists = True
    print(
        "Device: {} : {} : {} : {}".format(
            device.id, device.num_id, device.config, device.gateway_config
        )
    )

with io.open(certificate_file) as f:
    certificate = f.read()

if algorithm == "ES256":
    certificate_format = iot_v1.PublicKeyFormat.ES256_PEM
else:
    certificate_format = iot_v1.PublicKeyFormat.RSA_X509_PEM

# TODO: Auth type
device_template = {
    "id": gateway_id,
    "credentials": [
        {"public_key": {"format": certificate_format, "key": certificate}}
    ],
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Gateway {}".format(res))
else:
    print("Gateway exists, skipping")

À présent, le script crée un appareil dans le même registre que la passerelle :

Python

print("Creating device to bind: {}".format(device_id))
manager.create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

Le code suivant dans l'exemple de gestionnaire d'appareils est appelé par le script :

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'

# Check that the device doesn't already exist
client = iot_v1.DeviceManagerClient()

exists = False

parent = client.registry_path(project_id, cloud_region, registry_id)

devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == device_id:
        exists = True

# Create the device
device_template = {
    "id": device_id,
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.NON_GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Device {}".format(res))
else:
    print("Device exists, skipping")

Une fois que l'appareil et la passerelle ont été enregistrés dans le registre de démonstration, le script peut lier l'appareil à la passerelle. L'association de l'appareil permet à la passerelle de connecter et de dissocier l'appareil via la connexion à la passerelle de protocole Cloud IoT Core.

Python

print("Binding device")
manager.bind_device_to_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)

Le code suivant dans l'exemple de gestionnaire d'appareils est appelé par le script :

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.bind_device_to_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device Bound! {}".format(res))

Une fois l'appareil lié à la passerelle, le script s'interrompt afin que vous puissiez définir des données de configuration pour la passerelle ou l'appareil lié.

Pour définir les données de configuration, procédez comme suit :

  1. Accédez à Google Cloud Console.
  2. Définissez les données de configuration de la passerelle ou de l'appareil lié.
  3. Vérifiez que la passerelle et l'appareil reçoivent la dernière configuration avant que celle-ci ne commence à écouter.

Une fois que vous avez défini les données de configuration et continué le script, la passerelle se connecte au pont de protocole Cloud IoT Core, associe l'appareil, puis reçoit des messages de configuration.

Pour écouter les messages de configuration, le script appelle une fonction d'assistance à partir de l'exemple MQTT comme suit :

Python

print("Listening for messages for {} seconds".format(listen_time))
print("Try setting configuration in: ")
print("\t{}".format(edit_template.format(registry_id, project_id)))
try:
    input("Press enter to continue")
except SyntaxError:
    pass

def log_callback(client):
    def log_on_message(unused_client, unused_userdata, message):
        if not os.path.exists(log_path):
            with open(log_path, "w") as csvfile:
                logwriter = csv.writer(csvfile, dialect="excel")
                logwriter.writerow(["time", "topic", "data"])

        with open(log_path, "a") as csvfile:
            logwriter = csv.writer(csvfile, dialect="excel")
            logwriter.writerow(
                [
                    datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
                    message.topic,
                    message.payload,
                ]
            )

    client.on_message = log_on_message

cloudiot_mqtt_example.listen_for_messages(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    listen_time,
    log_callback,
)

Le code suivant de l'exemple MQTT est appelé par le script :

Python

global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Lorsque le script s'exécute, il écrit les messages de configuration dans un journal pouvant être transmis pour transmettre les configurations aux appareils associés.

Après avoir démontré comment vous pouvez recevoir les messages de configuration au nom d'une passerelle et de son appareil lié, le script envoie des données d'état au nom de l'appareil lié. Pour ce faire, le script appelle une fonction d'assistance à partir de l'exemple MQTT:

Python

print("Publishing messages demo")
print("Publishing: {} messages".format(num_messages))
cloudiot_mqtt_example.send_data_from_bound_device(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    "Hello from gateway_demo.py",
)

print("You can read the state messages for your device at this URL:")
print("\t{}".format(device_url_template).format(registry_id, device_id, project_id))
try:
    input("Press enter to continue after reading the messages.")
except SyntaxError:
    pass

Le code suivant de l'exemple MQTT est appelé par le script :

Python

global minimum_backoff_time

# Publish device events and gateway state.
device_topic = "/devices/{}/{}".format(device_id, "state")
gateway_topic = "/devices/{}/{}".format(gateway_id, "state")

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# Publish state to gateway topic
gateway_state = "Starting gateway at: {}".format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state)

# Publish num_messages messages to the MQTT bridge
for i in range(1, num_messages + 1):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    payload = "{}/{}-{}-payload-{}".format(registry_id, gateway_id, device_id, i)

    print(
        "Publishing message {}/{}: '{}' to {}".format(
            i, num_messages, payload, device_topic
        )
    )
    client.publish(device_topic, "{} : {}".format(device_id, payload))

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s").format(seconds_since_issue)
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(5)

detach_device(client, device_id)

print("Finished.")

Le script s'interrompt après la transmission des données d'état au nom de l'appareil lié, ce qui vous permet d'accéder à la console pour voir les données d'état avant la suppression du registre de démonstration, de la passerelle et de l'appareil à l'étape suivante.

Une fois le script terminé, il libère la passerelle, l'appareil et le registre qui étaient attribués au début.

Python

# Clean up
manager.unbind_device_from_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, gateway_id
)
manager.delete_registry(service_account_json, project_id, cloud_region, registry_id)

L'appareil est dissocié de la passerelle avant sa suppression :

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.unbind_device_from_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device unbound: {}".format(res))

Le script appelle une fonction d'assistance à partir de l'exemple de gestionnaire d'appareils pour supprimer à la fois l'appareil lié et la passerelle :

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
print("Delete device")
client = iot_v1.DeviceManagerClient()

device_path = client.device_path(project_id, cloud_region, registry_id, device_id)

return client.delete_device(request={"name": device_path})

Enfin, le script appelle une fonction d'assistance de l'exemple de gestion du registre pour supprimer le registre :

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
print("Delete registry")

client = iot_v1.DeviceManagerClient()
registry_path = client.registry_path(project_id, cloud_region, registry_id)

try:
    client.delete_device_registry(request={"name": registry_path})
    print("Deleted registry")
    return "Registry deleted"
except HttpError:
    print("Error, registry not deleted")
    raise