MQTT gateway demo script

This page provides a Python script that demonstrates how gateways work. Using Cloud IoT Core samples, the script first creates a demo registry, a gateway, and a device. Then it binds the device to the gateway, listens for configuration messages, and sends state data on the device's behalf. Finally, the script automatically unbinds the device, deletes the device and gateway, and cleans up the registry.

Features

The script demonstrates the following gateway features:

  • Creating a gateway
  • Creating a device to bind to the gateway
  • Binding the device to the gateway
  • Listening for configuration messages
  • Sending state data
  • Unbinding the device from the gateway
  • Deleting the device then deleting the gateway

Objectives

After running this script, you'll understand the code that performs the following:

  • Creating a registry
  • Creating a gateway and a device to bind
  • Binding the gateway to the device
  • Listening for configuration messages

Pricing

The amount data transmitted in this demo falls within the free usage tier. See Pricing for more information.

Before you begin

This demonstration assumes that you're familiar with Python and that you've reviewed the gateways overview.

Before you can run this sample, you need to get a project ID, the gcloud command-line tool, and check that billing is enabled:

  1. Create a new GCP Console project or retrieve the project ID of an existing project from the Google Cloud Platform Console:

    Go to the Projects page

  2. Install and then initialize the Google Cloud SDK:

    Download the SDK

  3. Make sure that billing is enabled for your project.

    Learn How to Enable Billing

Download the script

Download the demo script and set your current directory:

git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd iot/api-client/mqtt_example

Install dependencies

From within the iot/api-client/mqtt_example directory, install the dependencies needed to run the example:

pip --user install -r requirements.txt

For more information on setting up your Python development environment, such as installing pip on your system, see the Python Development Environment Setup Guide.

Create your credentials

Before running the example, complete the following steps:

  1. Create a service account named gateway-demo and click Create.
  2. Select the role Project Editor.
  3. Skip the optional Service account permissions screen by clicking Continue.
  4. Click Create key.
  5. In the Create key pane, under Key type, select JSON.
  6. Click Create.
  7. Save this key in the iot/api-client/mqtt_example directory, and rename it service_account.json.
  8. Download Google's CA root certificate into the same directory as the example files. You can optionally set the location of the certificate with the --ca_certs flag.

Add project ID and credentials to the script

  1. In the script, set the GCLOUD_PROJECT environment variable to your project ID.
  2. Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to service_account.json.

Run the script locally

In the project subdirectory python-docs-samples/iot/api-client/mqtt_example, run the script by invoking the command:

python gateway_demo.py

When the script runs, it writes the output to the terminal.

gateway_demo.py walkthrough

This section describes what happens in each step of the script.

First, the script creates a temporary device registry, which will contain both the gateway and the device that uses the gateway to communicate with Cloud IoT Core:

Python

print('Creating registry: {}'.format(registry_id))
manager.create_registry(
        service_account_json, project_id, cloud_region, pubsub_topic,
        registry_id)

The following code in the registry management sample is called by the script:

Python

client = get_client(service_account_json)
registry_parent = 'projects/{}/locations/{}'.format(
        project_id,
        cloud_region)
body = {
    'eventNotificationConfigs': [{
        'pubsubTopicName': pubsub_topic
    }],
    'id': registry_id
}
request = client.projects().locations().registries().create(
    parent=registry_parent, body=body)

try:
    response = request.execute()
    print('Created registry')
    return response
except HttpError:
    print('Error, registry not created')
    return ""

Next, the script creates a gateway and adds it to the registry:

Python

print('Creating gateway: {}'.format(gateway_id))
manager.create_gateway(
        service_account_json, project_id, cloud_region, registry_id,
        None, gateway_id, rsa_cert_path, 'RS256')

The following code in the device management sample is called by the script:

Python

# Check that the gateway doesn't already exist
exists = False
client = get_client(service_account_json)
registry_path = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

devices = client.projects().locations().registries().devices(
        ).list(
                parent=registry_path, fieldMask='config,gatewayConfig'
        ).execute().get('devices', [])

for device in devices:
        if device.get('id') == gateway_id:
            exists = True
        print('Device: {} : {} : {} : {}'.format(
            device.get('id'),
            device.get('numId'),
            device.get('config'),
            device.get('gatewayConfig')
            ))

# Create the gateway
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

with io.open(certificate_file) as f:
    certificate = f.read()

if algorithm == 'ES256':
    certificate_format = 'ES256_PEM'
else:
    certificate_format = 'RSA_X509_PEM'

# TODO: Auth type
device_template = {
    'id': gateway_id,
    'credentials': [{
        'publicKey': {
            'format': certificate_format,
            'key': certificate
        }
    }],
    'gatewayConfig': {
      'gatewayType': 'GATEWAY',
      'gatewayAuthMethod': 'ASSOCIATION_ONLY'
    }
}
devices = client.projects().locations().registries().devices()

if not exists:
    res = devices.create(
            parent=registry_name, body=device_template).execute()
    print('Created gateway {}'.format(res))
else:
    print('Gateway exists, skipping')

Now, the script creates a device in the same registry as the gateway:

Python

print('Creating device to bind: {}'.format(device_id))
manager.create_device(
        service_account_json, project_id, cloud_region, registry_id,
        device_id)

The following code in the device manager sample is called by the script:

Python

# Check that the device doesn't already exist
exists = False

client = get_client(service_account_json)
registry_path = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

devices = client.projects().locations().registries().devices(
        ).list(
                parent=registry_path, fieldMask='config,gatewayConfig'
        ).execute().get('devices', [])

for device in devices:
        if device.get('id') == device_id:
            exists = True

# Create the device
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

device_template = {
    'id': device_id,
    'gatewayConfig': {
      'gatewayType': 'NON_GATEWAY',
      'gatewayAuthMethod': 'ASSOCIATION_ONLY'
    }
}
devices = client.projects().locations().registries().devices()

if not exists:
    res = devices.create(
            parent=registry_name, body=device_template).execute()
    print('Created Device {}'.format(res))
else:
    print('Device exists, skipping')

After the device and gateway are in the demo registry, the script can then bind the device to the gateway. Binding the device enables the gateway to attach and detach the device over the gateway's connection to the Cloud IoT Core protocol bridge.

Python

print('Binding device')
manager.bind_device_to_gateway(
        service_account_json, project_id, cloud_region, registry_id,
        device_id, gateway_id)

The following code in the device manager sample is called by the script:

Python

client = get_client(service_account_json)

create_device(
        service_account_json, project_id, cloud_region, registry_id,
        device_id)

registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)
bind_request = {
    'deviceId': device_id,
    'gatewayId': gateway_id
}
client.projects().locations().registries().bindDeviceToGateway(
        parent=registry_name, body=bind_request).execute()
print('Device Bound!')

After the device is bound to the gateway, the script pauses so you can set configuration data for the gateway or bound device.

To set the configuration data:

  1. Navigate to the Google Cloud Platform Console.
  2. Set configuration data for the gateway or bound device.
  3. Verify that the gateway and device are receiving the latest configuration before the gateway starts listening.

After you set the configuration data and continue the script, the gateway connects to the Cloud IoT Core protocol bridge, attaches the device, then receives configuration messages for the device.

To listen for configuration messages, the script calls a helper function from the MQTT sample as follows:

Python

print('Listening for messages for {} seconds'.format(listen_time))
print('Try setting configuration in: ')
print('\t{}'.format(edit_template.format(registry_id, project_id)))
try:
    input("Press enter to continue")
except SyntaxError:
    pass

def log_callback(client):
    def log_on_message(unused_client, unused_userdata, message):
        if not os.path.exists(log_path):
            with open(log_path, 'w') as csvfile:
                logwriter = csv.writer(csvfile, dialect='excel')
                logwriter.writerow(['time', 'topic', 'data'])

        with open(log_path, 'a') as csvfile:
            logwriter = csv.writer(csvfile, dialect='excel')
            logwriter.writerow([
                    datetime.datetime.now().isoformat(),
                    message.topic,
                    message.payload])

    client.on_message = log_on_message

cloudiot_mqtt_example.listen_for_messages(
            service_account_json, project_id, cloud_region, registry_id,
            device_id, gateway_id, num_messages, rsa_private_path, 'RS256',
            ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port,
            jwt_exp_time, listen_time, log_callback)

The following code in the MQTT sample is called by the script:

Python

global minimum_backoff_time

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id, cloud_region, registry_id, gateway_id,
    private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
    mqtt_bridge_port)

attach_device(client, device_id, '')
print('Waiting for device to attach.')
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = '/devices/{}/config'.format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = '/devices/{}/config'.format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = '/devices/{}/errors'.format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print('Exceeded maximum backoff time. Giving up.')
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print('Refreshing token after {}s').format(seconds_since_issue)
        jwt_iat = datetime.datetime.utcnow()
        client = get_client(
            project_id, cloud_region, registry_id, gateway_id,
            private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
            mqtt_bridge_port)

    time.sleep(1)

detach_device(client, device_id)

print('Finished.')

When the script runs, it writes the configuration messages to a log that can be tailed to transmit the configurations to the attached devices.

After demonstrating how you can receive configuration messages on behalf of a gateway and its bound device, the script sends state data on behalf of the bound device. To do this, the script calls a helper function from the MQTT sample:

Python

print('Publishing messages demo')
print('Publishing: {} messages'.format(num_messages))
cloudiot_mqtt_example.send_data_from_bound_device(
            service_account_json, project_id, cloud_region, registry_id,
            device_id, gateway_id, num_messages, rsa_private_path, 'RS256',
            ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port,
            jwt_exp_time, "Hello from gateway_demo.py")

print('You can read the state messages for your device at this URL:')
print('\t{}'.format(device_url_template).format(
        registry_id, device_id, project_id))
try:
    input('Press enter to continue after reading the messages.')
except SyntaxError:
    pass

The following code in the MQTT sample is called by the script:

Python

global minimum_backoff_time

# Publish device events and gateway state.
device_topic = '/devices/{}/{}'.format(device_id, 'state')
gateway_topic = '/devices/{}/{}'.format(gateway_id, 'state')

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id, cloud_region, registry_id, gateway_id,
    private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
    mqtt_bridge_port)

attach_device(client, device_id, '')
print('Waiting for device to attach.')
time.sleep(5)

# Publish state to gateway topic
gateway_state = 'Starting gateway at: {}'.format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state, qos=1)

# Publish num_messages mesages to the MQTT bridge
for i in range(1, num_messages + 1):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print('Exceeded maximum backoff time. Giving up.')
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    payload = '{}/{}-{}-payload-{}'.format(
            registry_id, gateway_id, device_id, i)

    print('Publishing message {}/{}: \'{}\' to {}'.format(
            i, num_messages, payload, device_topic))
    client.publish(
            device_topic, '{} : {}'.format(device_id, payload), qos=1)

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print('Refreshing token after {}s').format(seconds_since_issue)
        jwt_iat = datetime.datetime.utcnow()
        client = get_client(
            project_id, cloud_region, registry_id, gateway_id,
            private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
            mqtt_bridge_port)

    time.sleep(5)

detach_device(client, device_id)

print('Finished.')

The script pauses after transmitting state data on behalf of the bound device so you can navigate to the GCP Console to see the state data before the demo registry, gateway, and device are removed in the next step.

When the script finishes, it frees up the gateway, device, and registry that were allocated at the beginning.

Python

# Clean up
manager.unbind_device_from_gateway(
        service_account_json, project_id, cloud_region, registry_id,
        device_id, gateway_id)
manager.delete_device(
        service_account_json, project_id, cloud_region, registry_id,
        device_id)
manager.delete_device(
        service_account_json, project_id, cloud_region, registry_id,
        gateway_id)
manager.delete_registry(
        service_account_json, project_id, cloud_region, registry_id)

The device is unbound from the gateway before it is deleted:

Python

client = get_client(service_account_json)

registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)
bind_request = {
    'deviceId': device_id,
    'gatewayId': gateway_id
}

res = client.projects().locations().registries().unbindDeviceFromGateway(
    parent=registry_name, body=bind_request).execute()
print('Device unbound: {}'.format(res))

The script calls a helper function from the device manager sample to delete both the bound device and the gateway:

Python

print('Delete device')
client = get_client(service_account_json)
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

device_name = '{}/devices/{}'.format(registry_name, device_id)

devices = client.projects().locations().registries().devices()
return devices.delete(name=device_name).execute()

Finally, the script calls a helper function from the registry management sample to delete the registry:

Python

print('Delete registry')
client = get_client(service_account_json)
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

registries = client.projects().locations().registries()
return registries.delete(name=registry_name).execute()
Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Cloud IoT Core Documentation