MQTT 閘道示範指令碼

這個頁面提供的 Python 指令碼會示範閘道的運作方式。指令碼會使用 Cloud IoT Core 範例來建立示範登錄檔、閘道和裝置。接著,裝置就會與閘道建立繫結、監聽設定訊息,並代替裝置傳送狀態資料。最後,指令碼會自動解除裝置繫結、刪除裝置和閘道,以及清除登錄檔。

功能與特色

此指令碼會示範下列閘道功能:

  • 建立閘道
  • 建立要繫結至閘道的裝置
  • 將裝置繫結至閘道
  • 監聽設定訊息
  • 傳送狀態資料
  • 將裝置與閘道解除繫結
  • 刪除裝置後再刪除閘道

目標

執行此指令碼後,您將瞭解可執行以下內容的程式碼:

  • 建立登錄檔
  • 建立要繫結的閘道和裝置
  • 將閘道繫結至裝置
  • 監聽設定訊息

定價

在這個示範中傳輸的資料量超出免費用量級別。詳情請參閱定價

事前準備

本教學課程假設您已熟悉 Python,並且已查看閘道 overview

在執行這個範例之前,您需要取得專案 ID、gcloud 指令列工具,並確認已啟用計費功能:

  1. 建立新的 Cloud Console 專案,或從 Google Cloud Console 擷取現有專案的專案 ID:

    前往「Projects」(專案) 頁面

  2. 安裝並初始化 Google Cloud SDK:

    下載 SDK

  3. 請確認您已為專案啟用計費功能。

    瞭解如何啟用計費功能

下載指令碼

下載示範指令碼並設定目前的目錄:

git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd python-docs-samples/iot/api-client/mqtt_example

安裝依附元件

iot/api-client/mqtt_example 目錄中,安裝執行範例所需的依附元件:

pip install -r requirements.txt --user

如要進一步瞭解如何設定 Python 開發環境,例如在系統上安裝 pip,請參閱 Python 開發環境設定指南

建立憑證

執行範例前,請先完成以下步驟:

  1. 建立名為 gateway-demo服務帳戶,然後按一下 [Create] (建立)
  2. 選取 [Project Editor] (專案編輯者) 角色。
  3. 如要略過選用的「服務帳戶權限」畫面,請按一下 [繼續]
  4. 點選 [Create key] (建立金鑰)
  5. 在「Create key」(建立金鑰) 窗格的「Key type」(金鑰類型) 下方,選取 [JSON]
  6. 按一下 [建立]。
  7. 請在 iot/api-client/mqtt_example 目錄中儲存這個金鑰,然後命名為 service_account.json
  8. Google 的 CA 根憑證下載至與範例檔案相同的目錄。您也可以用 --ca_certs 旗標設定憑證的位置。

在指令碼中新增專案 ID 和憑證

  1. 在指令碼中,將 GOOGLE_CLOUD_PROJECT 環境變數設為您的專案 ID。
  2. GOOGLE_APPLICATION_CREDENTIALS 環境變數設為 service_account.json

在本機執行指令碼

在專案子目錄 python-docs-samples/iot/api-client/mqtt_example 中,叫用下列指令以執行指令碼:

python gateway_demo.py

指令碼執行時,會將輸出內容寫入終端機。

gateway_demo.py 逐步操作說明

本節說明指令碼每個步驟中發生的情況。

首先,這個指令碼會建立臨時裝置登錄檔,其中包含閘道和使用閘道與 Cloud IoT Core 進行通訊的裝置:

Python

print("Creating registry: {}".format(registry_id))
manager.create_registry(
    service_account_json, project_id, cloud_region, pubsub_topic, registry_id
)

指令碼會呼叫註冊管理範例中的下列程式碼:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# pubsub_topic = 'your-pubsub-topic'
# registry_id = 'your-registry-id'
client = iot_v1.DeviceManagerClient()
parent = f"projects/{project_id}/locations/{cloud_region}"

if not pubsub_topic.startswith("projects/"):
    pubsub_topic = "projects/{}/topics/{}".format(project_id, pubsub_topic)

body = {
    "event_notification_configs": [{"pubsub_topic_name": pubsub_topic}],
    "id": registry_id,
}

try:
    response = client.create_device_registry(
        request={"parent": parent, "device_registry": body}
    )
    print("Created registry")
    return response
except HttpError:
    print("Error, registry not created")
    raise
except AlreadyExists:
    print("Error, registry already exists")
    raise

接著,指令碼會建立閘道,並將其新增至登錄檔:

Python

print("Creating gateway: {}".format(gateway_id))
manager.create_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    None,
    gateway_id,
    rsa_cert_path,
    "RS256",
)

指令碼會呼叫裝置管理範例中的下列程式碼:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
# certificate_file = 'path/to/certificate.pem'
# algorithm = 'ES256'
# Check that the gateway doesn't already exist
exists = False
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)
devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == gateway_id:
        exists = True
    print(
        "Device: {} : {} : {} : {}".format(
            device.id, device.num_id, device.config, device.gateway_config
        )
    )

with io.open(certificate_file) as f:
    certificate = f.read()

if algorithm == "ES256":
    certificate_format = iot_v1.PublicKeyFormat.ES256_PEM
else:
    certificate_format = iot_v1.PublicKeyFormat.RSA_X509_PEM

# TODO: Auth type
device_template = {
    "id": gateway_id,
    "credentials": [
        {"public_key": {"format": certificate_format, "key": certificate}}
    ],
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Gateway {}".format(res))
else:
    print("Gateway exists, skipping")

現在,指令碼會在與閘道相同的登錄檔中建立裝置:

Python

print("Creating device to bind: {}".format(device_id))
manager.create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

指令碼會呼叫裝置範例範例中的下列程式碼:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'

# Check that the device doesn't already exist
client = iot_v1.DeviceManagerClient()

exists = False

parent = client.registry_path(project_id, cloud_region, registry_id)

devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == device_id:
        exists = True

# Create the device
device_template = {
    "id": device_id,
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.NON_GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Device {}".format(res))
else:
    print("Device exists, skipping")

裝置和閘道在示範登錄檔中後,指令碼就能將裝置繫結至閘道。繫結裝置可讓閘道在與 Cloud IoT Core 通訊協定橋接器的連接上卸離和卸離。

Python

print("Binding device")
manager.bind_device_to_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)

指令碼會呼叫裝置範例範例中的下列程式碼:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.bind_device_to_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device Bound! {}".format(res))

裝置與閘道繫結後,系統會暫停指令碼,以便您為閘道或繫結裝置設定配置資料。

如何設定配置資料:

  1. 前往 Google Cloud Console
  2. 設定閘道或繫結裝置的設定資料。
  3. 在閘道開始監聽前,先確認閘道和裝置接收到最新設定。

完成設定設定檔並繼續指令碼後,閘道會連線至 Cloud IoT Core 通訊協定橋接器,附加裝置,然後接收裝置的設定訊息。

如要聽取設定訊息,指令碼會從 MQTT 範例呼叫輔助函式,如下所示:

Python

print("Listening for messages for {} seconds".format(listen_time))
print("Try setting configuration in: ")
print("\t{}".format(edit_template.format(registry_id, project_id)))
try:
    input("Press enter to continue")
except SyntaxError:
    pass

def log_callback(client):
    def log_on_message(unused_client, unused_userdata, message):
        if not os.path.exists(log_path):
            with open(log_path, "w") as csvfile:
                logwriter = csv.writer(csvfile, dialect="excel")
                logwriter.writerow(["time", "topic", "data"])

        with open(log_path, "a") as csvfile:
            logwriter = csv.writer(csvfile, dialect="excel")
            logwriter.writerow(
                [
                    datetime.datetime.now().isoformat(),
                    message.topic,
                    message.payload,
                ]
            )

    client.on_message = log_on_message

cloudiot_mqtt_example.listen_for_messages(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    listen_time,
    log_callback,
)

下列指令碼會呼叫 MQTT 範例中的下列程式碼:

Python

global minimum_backoff_time

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.utcnow()
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

指令碼執行時,系統會將設定訊息寫入記錄,進而將設定傳輸至附加的裝置。

示範您可以代表閘道及其繫結裝置接收設定訊息的方式,代表代表繫結的裝置傳送狀態資料。為此,此指令碼會從 MQTT 範例呼叫輔助函式:

Python

print("Publishing messages demo")
print("Publishing: {} messages".format(num_messages))
cloudiot_mqtt_example.send_data_from_bound_device(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    "Hello from gateway_demo.py",
)

print("You can read the state messages for your device at this URL:")
print("\t{}".format(device_url_template).format(registry_id, device_id, project_id))
try:
    input("Press enter to continue after reading the messages.")
except SyntaxError:
    pass

下列指令碼會呼叫 MQTT 範例中的下列程式碼:

Python

global minimum_backoff_time

# Publish device events and gateway state.
device_topic = "/devices/{}/{}".format(device_id, "state")
gateway_topic = "/devices/{}/{}".format(gateway_id, "state")

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# Publish state to gateway topic
gateway_state = "Starting gateway at: {}".format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state)

# Publish num_messages messages to the MQTT bridge
for i in range(1, num_messages + 1):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    payload = "{}/{}-{}-payload-{}".format(registry_id, gateway_id, device_id, i)

    print(
        "Publishing message {}/{}: '{}' to {}".format(
            i, num_messages, payload, device_topic
        )
    )
    client.publish(device_topic, "{} : {}".format(device_id, payload))

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s").format(seconds_since_issue)
        jwt_iat = datetime.datetime.utcnow()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(5)

detach_device(client, device_id)

print("Finished.")

指令碼會代表繫結的裝置傳輸狀態資料後暫停,可供您前往 Cloud Console 查看下一個步驟,在示範登錄檔、閘道和裝置之前移除資料。

指令碼執行完畢後,系統會釋放您在閘道上分配的閘道、裝置和登錄檔。

Python

# Clean up
manager.unbind_device_from_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, gateway_id
)
manager.delete_registry(service_account_json, project_id, cloud_region, registry_id)

在刪除裝置之前,裝置會與閘道解除繫結:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.unbind_device_from_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device unbound: {}".format(res))

指令碼會從裝置管理員範例中呼叫輔助函式,藉此刪除繫結的裝置和閘道:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
print("Delete device")
client = iot_v1.DeviceManagerClient()

device_path = client.device_path(project_id, cloud_region, registry_id, device_id)

return client.delete_device(request={"name": device_path})

最後,指令碼會從登錄檔管理範例中呼叫一個輔助函式,以便刪除登錄檔:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
print("Delete registry")

client = iot_v1.DeviceManagerClient()
registry_path = client.registry_path(project_id, cloud_region, registry_id)

try:
    client.delete_device_registry(request={"name": registry_path})
    print("Deleted registry")
    return "Registry deleted"
except HttpError:
    print("Error, registry not deleted")
    raise