このページでは、ゲートウェイが関連するデバイスにデータを送信して構成メッセージを中継する仕組みについて説明します。
ハブのデモアプリは、構成メッセージをリッスンし、デバイスに代わって状態情報を送信して、ゲートウェイがエンドツーエンドでどのように機能するかを示します。このアプリは、既存のマネージャーのサンプルやローカル関数を使用してデバイスを管理して、ハブのサンプルを使用してバインドされたデバイスを管理し、そのデバイスに代わって MQTT プロトコル ブリッジに接続できます。
デモが実行されると、次のような出力が表示されます。
Running demo
Creating registry: test-registry-1541200612
Created registry
Creating gateway: test-device-RS256
Created gateway {u'gatewayConfig': {u'gatewayType': u'GATEWAY', u'gatewayAuthMethod': u'ASSOCIATION_ONLY'} ... }
Creating device to bind: test-device-noauthbind
Created Device {u'numId': u'2552202179450953', u'config': {u'version': u'1', u'cloudUpdateTime': u'2018-11-02T23:16:58.198419Z'}, u'id': u'test-device-noauthbind', u'gatewayConfig': {u'gatewayType': u'NON_GATEWAY', u'gatewayAuthMethod': u'ASSOCIATION_ONLY'} ...}
Binding device
Device Bound!
Listening for messages for 30 seconds
Try setting configuration in:
https://console.cloud.google.com/iot/locations/us-central1/registries/test-registry-1541200612?project=noun-verb-123
Press enter to continue
Creating JWT using RS256 from private key file resources/rsa_private.pem
Attaching: /devices/test-device-noauthbind/attach
Waiting for device to attach.
('on_connect', 'Connection Accepted.')
on_publish
on_subscribe
Received message 'device-config' on topic '/devices/test-device-noauthbind/config' with Qos 1
on_subscribe
Received message 'gateway-config' on topic '/devices/test-device-RS256/config' with Qos 1
Detaching: /devices/test-device-noauthbind/detach
Finished.
Publishing messages demo
Publishing: 15 messages
Creating JWT using RS256 from private key file resources/rsa_private.pem
Attaching: /devices/test-device-noauthbind/attach
Waiting for device to attach.
Starting HUB at: 1541200710.73
('on_connect', 'Connection Accepted.')
on_publish
Publishing message 1/15: 'test-registry-1541200612/test-device-RS256-test-device-noauthbind-payload-1' to /devices/test-device-noauthbind/state
...
on_publish
Publishing message 15/15: 'test-registry-1541200612/test-device-RS256-test-device-noauthbind-payload-15' to /devices/test-device-noauthbind/state
Detaching: /devices/test-device-noauthbind/detach
on_publish
Finished.
You can read the state messages for your device at this URL:
https://console.cloud.google.com/iot/locations/us-central1/registries/test-registry-1541200612/devices/test-device-noauthbind?project=noun-verb-1234
Device unbound: {}
Delete device
Delete device
Delete registry
デモスクリプトの完全なソースコードを次に示します。
Python
import csv
import datetime
import io
import logging
import os
import time
from google.cloud import pubsub
# import manager # TODO(class) when feature exits beta, remove borrowed defs
import hub
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.CRITICAL)
cloud_region = 'us-central1'
device_id_template = 'test-device-{}'
gateway_id_template = 'test-gateway-{}'
topic_id = 'test-device-events-{}'.format(int(time.time()))
ca_cert_path = 'resources/roots.pem'
log_path = 'config_log.csv'
rsa_cert_path = 'resources/rsa_cert.pem'
rsa_private_path = 'resources/rsa_private.pem'
if ('GCLOUD_PROJECT' not in os.environ or
'GOOGLE_APPLICATION_CREDENTIALS' not in os.environ):
print(
'You must set GCLOUD_PROJECT and GOOGLE_APPLICATION_CREDENTIALS')
quit()
project_id = os.environ['GCLOUD_PROJECT']
service_account_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS']
pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id)
registry_id = 'test-registry-{}'.format(int(time.time()))
base_url = 'https://console.cloud.google.com/iot/locations/{}'.format(
cloud_region)
edit_template = '{}/registries/{}?project={}'.format(
base_url, '{}', '{}')
device_url_template = '{}/registries/{}/devices/{}?project={}'.format(
base_url, '{}', '{}', '{}')
mqtt_bridge_hostname = 'mqtt.googleapis.com'
mqtt_bridge_port = 8883
num_messages = 15
jwt_exp_time = 20
listen_time = 30
def create_iot_topic(project, topic_name):
"""Creates a PubSub Topic and grants access to Cloud IoT Core."""
pubsub_client = pubsub.PublisherClient()
topic_path = pubsub_client.topic_path(project, topic_name)
topic = pubsub_client.create_topic(topic_path)
policy = pubsub_client.get_iam_policy(topic_path)
policy.bindings.add(
role='roles/pubsub.publisher',
members=['serviceAccount:cloud-iot@system.gserviceaccount.com'])
pubsub_client.set_iam_policy(topic_path, policy)
return topic
def create_registry(
service_account_json, project_id, cloud_region, pubsub_topic,
registry_id):
""" Creates a registry and returns the result. Returns an empty result if
the registry already exists."""
client = hub.get_client(service_account_json)
registry_parent = 'projects/{}/locations/{}'.format(
project_id,
cloud_region)
body = {
'eventNotificationConfigs': [{
'pubsubTopicName': pubsub_topic
}],
'id': registry_id
}
request = client.projects().locations().registries().create(
parent=registry_parent, body=body)
response = request.execute()
print('Created registry')
return response
def delete_registry(
service_account_json, project_id, cloud_region, registry_id):
"""Deletes the specified registry."""
print('Delete registry')
client = hub.get_client(service_account_json)
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
project_id, cloud_region, registry_id)
registries = client.projects().locations().registries()
return registries.delete(name=registry_name).execute()
def create_device(
service_account_json, project_id, cloud_region, registry_id,
device_id, certificate_file):
"""Create a new device without authentication."""
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
project_id, cloud_region, registry_id)
with io.open(certificate_file) as f:
certificate = f.read()
client = hub.get_client(service_account_json)
device_template = {
'id': device_id,
'credentials': [{
'publicKey': {
'format': 'RSA_X509_PEM',
'key': certificate
}
}]
}
devices = client.projects().locations().registries().devices()
return devices.create(parent=registry_name, body=device_template).execute()
def delete_device(
service_account_json, project_id, cloud_region, registry_id,
device_id):
"""Delete the device with the given id."""
print('Delete device')
client = hub.get_client(service_account_json)
registry_name = 'projects/{}/locations/{}/registries/{}'.format(
project_id, cloud_region, registry_id)
device_name = '{}/devices/{}'.format(registry_name, device_id)
devices = client.projects().locations().registries().devices()
return devices.delete(name=device_name).execute()
if __name__ == '__main__':
print("Running demo")
gateway_id = device_id_template.format('RS256')
device_id = device_id_template.format('noauthbind')
print('Creating registry: {}'.format(registry_id))
create_registry(
service_account_json, project_id, cloud_region, pubsub_topic,
registry_id)
print('Creating gateway: {}'.format(gateway_id))
hub.create_gateway(
service_account_json, project_id, cloud_region, registry_id,
None, gateway_id, rsa_cert_path, 'RS256')
print('Creating device to bind: {}'.format(device_id))
hub.create_device(
service_account_json, project_id, cloud_region, registry_id,
device_id)
print('Binding device')
hub.bind_device_to_gateway(
service_account_json, project_id, cloud_region, registry_id,
device_id, gateway_id)
print('Listening for messages for {} seconds'.format(listen_time))
print('Try setting configuration in: ')
print('\t{}'.format(edit_template.format(registry_id, project_id)))
try:
input("Press enter to continue")
except SyntaxError:
pass
def log_callback (client):
def log_on_message(unused_client, unused_userdata, message):
if not os.path.exists(log_path):
with open(log_path, 'w') as csvfile:
logwriter = csv.writer(csvfile, dialect='excel')
logwriter.writerow(['time', 'topic', 'data'])
with open(log_path, 'a') as csvfile:
logwriter = csv.writer(csvfile, dialect='excel')
logwriter.writerow([
datetime.datetime.now().isoformat(),
message.topic,
message.payload])
client.on_message = log_on_message
hub.listen_for_config_messages(
service_account_json, project_id, cloud_region, registry_id,
device_id, gateway_id, num_messages, rsa_private_path, 'RS256',
ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port,
jwt_exp_time, listen_time, log_callback)
print('Publishing messages demo')
print('Publishing: {} messages'.format(num_messages))
hub.send_data_from_bound_device(
service_account_json, project_id, cloud_region, registry_id,
device_id, gateway_id, num_messages, rsa_private_path, 'RS256',
ca_cert_path, mqtt_bridge_hostname, mqtt_bridge_port,
jwt_exp_time, "Hello from hub_demo.py")
print('You can read the state messages for your device at this URL:')
print('\t{}'.format(device_url_template).format(
registry_id, device_id, project_id))
try:
input('Press enter to continue after reading the messages.')
except SyntaxError:
pass
# Clean up
hub.unbind_device_from_gateway(
service_account_json, project_id, cloud_region, registry_id,
device_id, gateway_id)
delete_device(
service_account_json, project_id, cloud_region, registry_id,
device_id)
delete_device(
service_account_json, project_id, cloud_region, registry_id,
gateway_id)
delete_registry(
service_account_json, project_id, cloud_region, registry_id)
このデモスクリプトは、次のオペレーションを行います。
デモレジストリの作成
デモでは、まず一時的なデバイス レジストリを作成します。このレジストリは、デモが終了すると削除されます。このレジストリには、特別な種類のデバイスであるゲートウェイと、そのゲートウェイによって管理されるデバイスが含まれます。デモレジストリは、レジストリ サンプルコードの作成を使用して作成します。
次のコードでは、マネージャーのサンプルでデバイス レジストリを作成する方法を示します。
レジストリが作成されたら、デバイスを追加できます。
ゲートウェイの作成
レジストリに最初に追加したデバイスは、ゲートウェイと呼ばれる特別な種類のデバイスです。このデバイスには、独自の構成を関連付けることができ、他のデバイスをバインドしてデバイスのプロキシとして動作させることもできます。ゲートウェイ デバイスを作成したら、関連付けられた認証情報を使用して Cloud IoT Core プロトコル ブリッジに接続できます。
次のコードでは、ハブサンプルが特別なゲートウェイ デバイスを作成する方法を示します。
デバイスを作成してバインドする
デバイスをゲートウェイにバインドするには、ゲートウェイ内のレジストリ内にデバイスを作成する必要があります。デモでは、ハブサンプルを使用してデバイスを作成します。
次のコードでは、ハブサンプルがバインドするためのデバイスを作成する方法を示します。
ゲートウェイにデバイスをバインドする
デモレジストリ内のデバイスとゲートウェイを使用して、デバイスをゲートウェイにバインドする準備ができています。デバイスをゲートウェイにバインドすると、ゲートウェイでは、Cloud IoT Core プロトコル ブリッジへの接続を介して、デバイスの接続と切断を行えます。デモアプリでは、次のようにハブサンプルを使用してこれを行います。
次のコードでは、ハブがデバイスをゲートウェイにバインドする方法を示します。
構成メッセージのリッスン
デバイスがゲートウェイにバインドされると、ゲートウェイでは、Cloud IoT Core プロトコル ブリッジに接続して、デバイスを接続し、そのデバイスの構成メッセージを受信できます。デモでは、これを行うためにハブサンプルのヘルパー関数を使用します。
ハブサンプルでは、次のように Paho MQTT クライアントを使用して、プロトコル ブリッジに接続します。
なお、このサンプルでは、この接続を開始する前に停止します。これにより、ゲートウェイやバインドされたデバイスをリッスンして構成データを設定し始める前にデベロッパー コンソールに移動して、最新の構成が受信されていることを確認できます。
状態データの送信
ゲートウェイとそのバインドされたデバイスの代わりに構成メッセージを受信する方法をデモで確認すると、デモアプリでは、関連するデバイスに代わって状態データを送信します。これを行うために、このデモでは、ハブのサンプルアプリのヘルパー関数を使用します。
デモは、バインドされたデバイスに代わって状態データを送信すると、一時停止します。これにより、次のステップでデモのレジストリとデバイスが削除される前に、デベロッパー コンソールに移動できます。
デモで使用したリソースの削除
デモが終了すると、最初に割り当てられたデバイスとレジストリが解放されます。
デバイスを削除する前に、ゲートウェイからバインド解除します。
マネージャー サンプルのヘルパー関数を使用して、バインドされたデバイスとゲートウェイの両方を削除します。
最後に、マネージャー サンプルのヘルパー関数を使用してレジストリを削除します。
以上、ゲートウェイ機能の動作をエンドツーエンドで確認しました。