Google Cloud IoT Core is being retired on August 16, 2023. Contact your Google Cloud account team for more information.

Gateways

This page explains how gateways can send data and relay configuration messages for associated devices.

The hub demo app listens for configuration messages and sends state information on behalf of a device to show how gateways work end-to-end. This app can use the existing manager sample or local functions to manage devices and uses the hub sample to manage a bound device and connect to the MQTT protocol bridge on behalf of that device.

When the demo runs, you will see output similar to the following output.

Running demo

Creating registry: test-registry-1541200612
Created registry

Creating gateway: test-device-RS256
Created gateway {u'gatewayConfig': {u'gatewayType': u'GATEWAY', u'gatewayAuthMethod': u'ASSOCIATION_ONLY'} ... }
Creating device to bind: test-device-noauthbind
Created Device {u'numId': u'2552202179450953', u'config': {u'version': u'1', u'cloudUpdateTime': u'2018-11-02T23:16:58.198419Z'}, u'id': u'test-device-noauthbind', u'gatewayConfig': {u'gatewayType': u'NON_GATEWAY', u'gatewayAuthMethod': u'ASSOCIATION_ONLY'} ...}

Binding device
Device Bound!
Listening for messages for 30 seconds
Try setting configuration in:
  https://console.cloud.google.com/iot/locations/us-central1/registries/test-registry-1541200612?project=noun-verb-123
Press enter to continue

Creating JWT using RS256 from private key file resources/rsa_private.pem
Attaching: /devices/test-device-noauthbind/attach
Waiting for device to attach.
('on_connect', 'Connection Accepted.')
on_publish
on_subscribe
Received message 'device-config' on topic '/devices/test-device-noauthbind/config' with Qos 1
on_subscribe
Received message 'gateway-config' on topic '/devices/test-device-RS256/config' with Qos 1
Detaching: /devices/test-device-noauthbind/detach
Finished.

Publishing messages demo
Publishing: 15 messages
Creating JWT using RS256 from private key file resources/rsa_private.pem
Attaching: /devices/test-device-noauthbind/attach
Waiting for device to attach.
Starting HUB at: 1541200710.73
('on_connect', 'Connection Accepted.')
on_publish
Publishing message 1/15: 'test-registry-1541200612/test-device-RS256-test-device-noauthbind-payload-1' to /devices/test-device-noauthbind/state
...
on_publish
Publishing message 15/15: 'test-registry-1541200612/test-device-RS256-test-device-noauthbind-payload-15' to /devices/test-device-noauthbind/state
Detaching: /devices/test-device-noauthbind/detach
on_publish
Finished.

You can read the state messages for your device at this URL:
    https://console.cloud.google.com/iot/locations/us-central1/registries/test-registry-1541200612/devices/test-device-noauthbind?project=noun-verb-1234

Device unbound: {}
Delete device
Delete device
Delete registry

The following code shows the full source of the demo script:

Python

import csv
import datetime
import io
import logging
import os
import time

from google.cloud import pubsub

# import manager  # TODO(class) when feature exits beta, remove borrowed defs
import hub

logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.CRITICAL)

cloud_region = 'us-central1'
device_id_template = 'test-device-{}'
gateway_id_template = 'test-gateway-{}'
topic_id = 'test-device-events-{}'.format(int(time.time()))

ca_cert_path = 'resources/roots.pem'
log_path = 'config_log.csv'
rsa_cert_path = 'resources/rsa_cert.pem'
rsa_private_path = 'resources/rsa_private.pem'

if ('GCLOUD_PROJECT' not in os.environ or
        'GOOGLE_APPLICATION_CREDENTIALS' not in os.environ):
    print(
      'You must set GCLOUD_PROJECT and GOOGLE_APPLICATION_CREDENTIALS')
    quit()

project_id = os.environ['GCLOUD_PROJECT']
service_account_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS']

pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id)
registry_id = 'test-registry-{}'.format(int(time.time()))

base_url = 'https://console.cloud.google.com/iot/locations/{}'.format(
        cloud_region)
edit_template = '{}/registries/{}?project={}'.format(
        base_url, '{}', '{}')

device_url_template = '{}/registries/{}/devices/{}?project={}'.format(
        base_url, '{}', '{}', '{}')

mqtt_bridge_hostname = 'mqtt.googleapis.com'
mqtt_bridge_port = 8883

num_messages = 15
jwt_exp_time = 20
listen_time = 30


def create_iot_topic(project, topic_name):
    """Creates a PubSub Topic and grants access to Cloud IoT Core."""
    pubsub_client = pubsub.PublisherClient()
    topic_path = pubsub_client.topic_path(project, topic_name)

    topic = pubsub_client.create_topic(topic_path)
    policy = pubsub_client.get_iam_policy(topic_path)

    policy.bindings.add(
        role='roles/pubsub.publisher',
        members=['serviceAccount:cloud-iot@system.gserviceaccount.com'])

    pubsub_client.set_iam_policy(topic_path, policy)

    return topic


def create_registry(
        service_account_json, project_id, cloud_region, pubsub_topic,
        registry_id):
    """ Creates a registry and returns the result. Returns an empty result if
    the registry already exists."""
    client = hub.get_client(service_account_json)
    registry_parent = 'projects/{}/locations/{}'.format(
            project_id,
            cloud_region)
    body = {
        'eventNotificationConfigs': [{
            'pubsubTopicName': pubsub_topic
        }],
        'id': registry_id
    }
    request = client.projects().locations().registries().create(
        parent=registry_parent, body=body)

    response = request.execute()
    print('Created registry')
    return response


def delete_registry(
       service_account_json, project_id, cloud_region, registry_id):
    """Deletes the specified registry."""
    print('Delete registry')
    client = hub.get_client(service_account_json)
    registry_name = 'projects/{}/locations/{}/registries/{}'.format(
            project_id, cloud_region, registry_id)

    registries = client.projects().locations().registries()
    return registries.delete(name=registry_name).execute()


def create_device(
        service_account_json, project_id, cloud_region, registry_id,
        device_id, certificate_file):
    """Create a new device without authentication."""
    registry_name = 'projects/{}/locations/{}/registries/{}'.format(
            project_id, cloud_region, registry_id)

    with io.open(certificate_file) as f:
        certificate = f.read()

    client = hub.get_client(service_account_json)
    device_template = {
        'id': device_id,
        'credentials': [{
            'publicKey': {
                'format': 'RSA_X509_PEM',
                'key': certificate
            }
        }]
    }

    devices = client.projects().locations().registries().devices()
    return devices.create(parent=registry_name, body=device_template).execute()


def delete_device(
        service_account_json, project_id, cloud_region, registry_id,
        device_id):
    """Delete the device with the given id."""
    print('Delete device')
    client = hub.get_client(service_account_json)
    registry_name = 'projects/{}/locations/{}/registries/{}'.format(
            project_id, cloud_region, registry_id)

    device_name = '{}/devices/{}'.format(registry_name, device_id)

    devices = client.projects().locations().registries().devices()
    return devices.delete(name=device_name).execute()


if __name__ == '__main__':
    print("Running demo")

    gateway_id = device_id_template.format('RS256')
    device_id = device_id_template.format('noauthbind')

    print('Creating registry: {}'.format(registry_id))
    create_registry(
            service_account_json, project_id, cloud_region, pubsub_topic,
            registry_id)

    print('Creating gateway: {}'.format(gateway_id))
    hub.create_gateway(
            service_account_json, project_id, cloud_region, registry_id,
            None, gateway_id, rsa_cert_path, 'RS256')

    print('Creating device to bind: {}'.format(device_id))
    hub.create_device(
            service_account_json, project_id, cloud_region, registry_id,
            device_id)

    print('Binding device')
    hub.bind_device_to_gateway(
            service_account_json, project_id, cloud_region, registry_id,
            device_id, gateway_id)

    print('Listening for messages for {} seconds'.format(listen_time))
    print('Try setting configuration in: ')
    print('\t{}'.format(edit_template.format(registry_id, project_id)))
    try:
        input("Press enter to continue")
    except SyntaxError:
        pass

    def log_callback (client):
        def log_on_message(unused_client, unused_userdata, message):
            if not os.path.exists(log_path):
                with open(log_path, 'w') as csvfile:
                    logwriter = csv.writer(csvfile, dialect='excel')
                    logwriter.writerow(['time', 'topic', 'data'])

            with open(log_path, 'a') as csvfile:
                logwriter = csv.writer(csvfile, dialect='excel')
                logwriter.writerow([
                        datetime.datetime.now().isoformat(),
                        message.topic,
                        message.payload])

        client.on_message = log_on_message


    hub.listen_for_config_messages(
                service_account_json, project_id, cloud_region, registry_id,
                device_id, gateway_id, num_messages, rsa_private_path, 'RS256',
                ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port,
                jwt_exp_time, listen_time, log_callback)

    print('Publishing messages demo')
    print('Publishing: {} messages'.format(num_messages))
    hub.send_data_from_bound_device(
                service_account_json, project_id, cloud_region, registry_id,
                device_id, gateway_id, num_messages, rsa_private_path, 'RS256',
                ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port,
                jwt_exp_time, "Hello from hub_demo.py")

    print('You can read the state messages for your device at this URL:')
    print('\t{}'.format(device_url_template).format(
            registry_id, device_id, project_id))
    try:
        input('Press enter to continue after reading the messages.')
    except SyntaxError:
        pass

    # Clean up
    hub.unbind_device_from_gateway(
            service_account_json, project_id, cloud_region, registry_id,
            device_id, gateway_id)
    delete_device(
            service_account_json, project_id, cloud_region, registry_id,
            device_id)
    delete_device(
            service_account_json, project_id, cloud_region, registry_id,
            gateway_id)
    delete_registry(
            service_account_json, project_id, cloud_region, registry_id)

The demo script performs the following operations:

Create a demo registry

The demo first creates a temporary device registry that will be cleaned up when the demo finishes. This registry will contain the gateway, which is a special type of device, as well as a device that will be managed by the gateway. The create registry sample code is used to do this.

Python

print('Creating registry: {}'.format(registry_id))
create_registry(
        service_account_json, project_id, cloud_region, pubsub_topic,
        registry_id)

The following code shows how the manager sample creates a device registry.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# pubsub_topic = 'your-pubsub-topic'
# registry_id = 'your-registry-id'
client = iot_v1.DeviceManagerClient()
parent = f"projects/{project_id}/locations/{cloud_region}"

if not pubsub_topic.startswith("projects/"):
    pubsub_topic = "projects/{}/topics/{}".format(project_id, pubsub_topic)

body = {
    "event_notification_configs": [{"pubsub_topic_name": pubsub_topic}],
    "id": registry_id,
}

try:
    response = client.create_device_registry(
        request={"parent": parent, "device_registry": body}
    )
    print("Created registry")
    return response
except HttpError:
    print("Error, registry not created")
    raise
except AlreadyExists:
    print("Error, registry already exists")
    raise

After the registry is created, you're ready to add devices.

Create a gateway

The first device added to the registry is a special type of device called a gateway. This device can have its own configuration associated with it and can also have other devices bound to it so that it can behave as a proxy for them. After it's created, the gateway device can connect to the Cloud IoT Core protocol bridge using its associated credentials.

Python

print('Creating gateway: {}'.format(gateway_id))
hub.create_gateway(
        service_account_json, project_id, cloud_region, registry_id,
        None, gateway_id, rsa_cert_path, 'RS256')

The following code shows how the hub sample creates the special gateway device.

Python

"""Create a gateway to bind devices to."""
# Check that the gateway doesn't already exist
exists = False
client = get_client(service_account_json)
registry_path = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

devices = client.projects().locations().registries().devices(
        ).list(
                parent=registry_path, fieldMask='config,gatewayConfig'
        ).execute().get('devices', [])

for device in devices:
        if device.get('id') == gateway_id:
            exists = True
        print('Device: {} : {} : {} : {}'.format(
            device.get('id'),
            device.get('numId'),
            device.get('config'),
            device.get('gatewayConfig')
            ))

# Create the gateway
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

with io.open(certificate_file) as f:
    certificate = f.read()

if algorithm == 'ES256':
    certificate_format = 'ES256_PEM'
else:
    certificate_format = 'RSA_X509_PEM'

# TODO: Auth type
device_template = {
    'id': gateway_id,
    'credentials': [{
        'publicKey': {
            'format': certificate_format,
            'key': certificate
        }
    }],
    'gatewayConfig': {
      'gatewayType': 'GATEWAY',
      'gatewayAuthMethod': 'ASSOCIATION_ONLY'
    }
}
devices = client.projects().locations().registries().devices()

if not exists:
    res = devices.create(
            parent=registry_name, body=device_template).execute()
    print('Created gateway {}'.format(res))
else:
    print('Gateway exists, skipping')

Create a device to bind

Before you can bind a device to a gateway, a device must be created within the registry with the gateway. The demo uses the hub sample to create a device.

Python

print('Creating device to bind: {}'.format(device_id))
hub.create_device(
        service_account_json, project_id, cloud_region, registry_id,
        device_id)

The following code shows how the hub sample creates the device for binding.

Python

# Check that the device doesn't already exist
exists = False

client = get_client(service_account_json)
registry_path = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

devices = client.projects().locations().registries().devices(
        ).list(
                parent=registry_path, fieldMask='config,gatewayConfig'
        ).execute().get('devices', [])

for device in devices:
        if device.get('id') == device_id:
            exists = True

# Create the device
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)

device_template = {
    'id': device_id,
    'gatewayConfig': {
      'gatewayType': 'NON_GATEWAY',
      'gatewayAuthMethod': 'ASSOCIATION_ONLY'
    }
}
devices = client.projects().locations().registries().devices()

if not exists:
    res = devices.create(
            parent=registry_name, body=device_template).execute()
    print('Created Device {}'.format(res))
else:
    print('Device exists, skipping')

Bind the device to the gateway

With the device and gateway in the demo registry, you are ready to bind the device to the gateway. Binding the device to the gateway will allow the gateway to then attach and detach the device over its connection to the Cloud IoT Core protocol bridge. The demo app uses the hub sample to do this as follows.

Python

print('Binding device')
hub.bind_device_to_gateway(
        service_account_json, project_id, cloud_region, registry_id,
        device_id, gateway_id)

The following code shows how the hub is binding the device to the gateway.

Python

client = get_client(service_account_json)

create_device(
        service_account_json, project_id, cloud_region, registry_id,
        device_id)

registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)
bind_request = {
    'deviceId': device_id,
    'gatewayId': gateway_id
}
client.projects().locations().registries().bindDeviceToGateway(
        parent=registry_name, body=bind_request).execute()
print('Device Bound!')

Listen for configuration messages

After the device is bound to the gateway, the gateway can connect to the Cloud IoT Core protocol bridge, attach the device, and then receive configuration messages for that device. The demo uses a helper function in the hub sample to do this.

Python

print('Listening for messages for {} seconds'.format(listen_time))
print('Try setting configuration in: ')
print('\t{}'.format(edit_template.format(registry_id, project_id)))
try:
    input("Press enter to continue")
except SyntaxError:
    pass

def log_callback (client):
    def log_on_message(unused_client, unused_userdata, message):
        if not os.path.exists(log_path):
            with open(log_path, 'w') as csvfile:
                logwriter = csv.writer(csvfile, dialect='excel')
                logwriter.writerow(['time', 'topic', 'data'])

        with open(log_path, 'a') as csvfile:
            logwriter = csv.writer(csvfile, dialect='excel')
            logwriter.writerow([
                    datetime.datetime.now().isoformat(),
                    message.topic,
                    message.payload])

    client.on_message = log_on_message


hub.listen_for_config_messages(
            service_account_json, project_id, cloud_region, registry_id,
            device_id, gateway_id, num_messages, rsa_private_path, 'RS256',
            ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port,
            jwt_exp_time, listen_time, log_callback)

In the hub sample, the Paho MQTT client is used to connect to the protocol bridge as follows.

Python

global minimum_backoff_time

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_mqtt_client(
    project_id, cloud_region, registry_id, gateway_id,
    private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
    mqtt_bridge_port)

attach_device(client, device_id)
print('Waiting for device to attach.')
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = '/devices/{}/config'.format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = '/devices/{}/config'.format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = '/devices/{}/errors'.format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print('Exceeded maximum backoff time. Giving up.')
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print('Refreshing token after {}s').format(seconds_since_issue)
        jwt_iat = datetime.datetime.utcnow()
        client = get_mqtt_client(
            project_id, cloud_region, registry_id, gateway_id,
            private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
            mqtt_bridge_port)

    time.sleep(1)

detach_device(client, device_id)

print('Finished.')

Note that the sample pauses before starting this step so that you can navigate to the developer console before it starts listening and set configuration data for your gateway or bound device and can verify that the latest configuration is being receieved.

Send state data

After demonstrating how you can receieve configuration messages on behalf of a gateway and its bound devices, the demo app sends state data on behalf of the associated device. To do this, the demo uses the helper function from the hub sample app.

Python

print('Publishing messages demo')
print('Publishing: {} messages'.format(num_messages))
hub.send_data_from_bound_device(
            service_account_json, project_id, cloud_region, registry_id,
            device_id, gateway_id, num_messages, rsa_private_path, 'RS256',
            ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port,
            jwt_exp_time, "Hello from hub_demo.py")

print('You can read the state messages for your device at this URL:')
print('\t{}'.format(device_url_template).format(
        registry_id, device_id, project_id))
try:
    input('Press enter to continue after reading the messages.')
except SyntaxError:
    pass

Python

global minimum_backoff_time

# Publish device events and gateway state.
device_topic = '/devices/{}/{}'.format(device_id, 'state')
gateway_topic = '/devices/{}/{}'.format(gateway_id, 'state')

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_mqtt_client(
    project_id, cloud_region, registry_id, gateway_id,
    private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
    mqtt_bridge_port)

attach_device(client, device_id)
print('Waiting for device to attach.')
time.sleep(5)

# Publish state to gateway topic
gateway_state = 'Starting HUB at: {}'.format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state, qos=1)

# Publish num_messages mesages to the MQTT bridge
for i in range(1, num_messages + 1):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print('Exceeded maximum backoff time. Giving up.')
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    payload = '{}/{}-{}-payload-{}'.format(
            registry_id, gateway_id, device_id, i)

    print('Publishing message {}/{}: \'{}\' to {}'.format(
            i, num_messages, payload, device_topic))
    client.publish(
            device_topic, '{} : {}'.format(device_id, payload), qos=1)

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print('Refreshing token after {}s').format(seconds_since_issue)
        jwt_iat = datetime.datetime.utcnow()
        client = get_mqtt_client(
            project_id, cloud_region, registry_id, gateway_id,
            private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
            mqtt_bridge_port)

    time.sleep(5)

detach_device(client, device_id)

print('Finished.')

The demo will pause after transmitting state data on behalf of the bound device so that you can nagivate to the developer console before the demo registry and devices are removed in the next step.

Remove the resources used in the demo

When the demo finishes, it frees up the devices and registry that were alloacted at the beginning.

Python

# Clean up
hub.unbind_device_from_gateway(
        service_account_json, project_id, cloud_region, registry_id,
        device_id, gateway_id)
delete_device(
        service_account_json, project_id, cloud_region, registry_id,
        device_id)
delete_device(
        service_account_json, project_id, cloud_region, registry_id,
        gateway_id)
delete_registry(
        service_account_json, project_id, cloud_region, registry_id)

The device is unbound from the gateway before it is deleted.

Python

client = get_client(service_account_json)

registry_name = 'projects/{}/locations/{}/registries/{}'.format(
        project_id, cloud_region, registry_id)
bind_request = {
    'deviceId': device_id,
    'gatewayId': gateway_id
}

res = client.projects().locations().registries().unbindDeviceFromGateway(
    parent=registry_name, body=bind_request).execute()
print('Device unbound: {}'.format(res))

A helper function from the manager sample is used to delete both the bound device as well as the gateway.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
print("Delete device")
client = iot_v1.DeviceManagerClient()

device_path = client.device_path(project_id, cloud_region, registry_id, device_id)

return client.delete_device(request={"name": device_path})

Last, a helper function from the manager sample is used to delete the registry.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
print("Delete registry")

client = iot_v1.DeviceManagerClient()
registry_path = client.registry_path(project_id, cloud_region, registry_id)

try:
    client.delete_device_registry(request={"name": registry_path})
    print("Deleted registry")
    return "Registry deleted"
except HttpError:
    print("Error, registry not deleted")
    raise

At this point you have seen the gateway functionality work end-to-end.