Skrip demo gerbang MQTT

Halaman ini menyediakan skrip Python yang menunjukkan cara kerja gateway. Dengan menggunakan sampel Cloud IoT Core, skrip terlebih dahulu membuat registry demo, gateway, dan perangkat. Kemudian, perangkat akan diikat ke gateway, mendengarkan pesan konfigurasi, dan mengirim data status atas nama perangkat. Terakhir, skrip otomatis melepaskan perangkat, menghapus perangkat dan gateway, serta membersihkan registry.

Fitur

Skrip ini menunjukkan fitur gateway berikut:

  • Membuat gateway
  • Membuat perangkat untuk mengikat ke gateway
  • Mengikat perangkat ke gateway
  • Mendengarkan pesan konfigurasi
  • Mengirim data status
  • Melepaskan perangkat dari gateway
  • Menghapus perangkat, lalu menghapus gateway

Tujuan

Setelah menjalankan skrip ini, Anda akan memahami kode yang melakukan hal berikut:

  • Membuat registry
  • Membuat gateway dan perangkat untuk mengikat
  • Mengikat gateway ke perangkat
  • Mendengarkan pesan konfigurasi

Harga

Jumlah data yang dikirimkan dalam demo ini berada dalam tingkat penggunaan gratis. Lihat Harga untuk informasi selengkapnya.

Sebelum memulai

Demo ini mengasumsikan bahwa Anda terbiasa dengan Python dan Anda telah meninjau ringkasan gateway.

Sebelum dapat menjalankan sampel ini, Anda perlu mendapatkan ID project, fitur command-line gcloud, dan memeriksa apakah penagihan diaktifkan:

  1. Buat project Cloud Console baru atau ambil ID project dari project yang sudah ada dari Google Cloud Console:

    Buka laman Project

  2. Instal lalu inisialisasi Google Cloud SDK:

    Unduh SDK

  3. Pastikan penagihan diaktifkan untuk project Anda.

    Pelajari Cara Mengaktifkan Penagihan

Download skripnya

Download skrip demo dan setel direktori saat ini:

git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd python-docs-samples/iot/api-client/mqtt_example

Menginstal dependensi

Dari dalam direktori iot/api-client/mqtt_example, instal dependensi yang diperlukan untuk menjalankan contoh:

pip install -r requirements.txt --user

Untuk informasi selengkapnya tentang menyiapkan lingkungan pengembangan Python, seperti memasang pip di sistem, lihat Panduan Penyiapan Lingkungan Pengembangan Python.

Buat kredensial Anda

Sebelum menjalankan contoh, selesaikan langkah-langkah berikut:

  1. Buat akun layanan bernama gateway-demo dan klik Buat.
  2. Pilih peran Editor Project.
  3. Lewati layar Izin akun layanan opsional dengan mengeklik Lanjutkan.
  4. Klik Buat kunci.
  5. Di panel Buat kunci, pada Jenis kunci, pilih JSON.
  6. Klik Buat.
  7. Simpan kunci ini di direktori iot/api-client/mqtt_example dan ganti namanya menjadi service_account.json.
  8. Download CA root certificate Google ke direktori yang sama dengan file contoh. Anda dapat memilih untuk menyetel lokasi sertifikat dengan tanda --ca_certs.

Tambahkan ID project dan kredensial ke skrip

  1. Dalam skrip, tetapkan variabel lingkungan GOOGLE_CLOUD_PROJECT ke ID project Anda.
  2. Tetapkan variabel lingkungan GOOGLE_APPLICATION_CREDENTIALS ke service_account.json.

Jalankan skrip secara lokal

Dalam subdirektori project python-docs-samples/iot/api-client/mqtt_example, jalankan skrip dengan menjalankan perintah:

python gateway_demo.py

Saat skrip berjalan, skrip akan menulis output ke terminal.

panduan gateway_demo.py

Bagian ini menjelaskan apa yang terjadi di setiap langkah skrip.

Pertama, skrip membuat registry perangkat sementara, yang akan berisi gateway dan perangkat yang menggunakan gateway untuk berkomunikasi dengan Cloud IoT Core:

Python

print("Creating registry: {}".format(registry_id))
manager.create_registry(
    service_account_json, project_id, cloud_region, pubsub_topic, registry_id
)

Kode berikut dalam contoh pengelolaan registry dipanggil oleh skrip:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# pubsub_topic = 'your-pubsub-topic'
# registry_id = 'your-registry-id'
client = iot_v1.DeviceManagerClient()
parent = f"projects/{project_id}/locations/{cloud_region}"

if not pubsub_topic.startswith("projects/"):
    pubsub_topic = "projects/{}/topics/{}".format(project_id, pubsub_topic)

body = {
    "event_notification_configs": [{"pubsub_topic_name": pubsub_topic}],
    "id": registry_id,
}

try:
    response = client.create_device_registry(
        request={"parent": parent, "device_registry": body}
    )
    print("Created registry")
    return response
except HttpError:
    print("Error, registry not created")
    raise
except AlreadyExists:
    print("Error, registry already exists")
    raise

Selanjutnya, skrip membuat gateway dan menambahkannya ke registry:

Python

print("Creating gateway: {}".format(gateway_id))
manager.create_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    None,
    gateway_id,
    rsa_cert_path,
    "RS256",
)

Kode berikut dalam contoh pengelolaan perangkat dipanggil oleh skrip:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
# certificate_file = 'path/to/certificate.pem'
# algorithm = 'ES256'
# Check that the gateway doesn't already exist
exists = False
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)
devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == gateway_id:
        exists = True
    print(
        "Device: {} : {} : {} : {}".format(
            device.id, device.num_id, device.config, device.gateway_config
        )
    )

with io.open(certificate_file) as f:
    certificate = f.read()

if algorithm == "ES256":
    certificate_format = iot_v1.PublicKeyFormat.ES256_PEM
else:
    certificate_format = iot_v1.PublicKeyFormat.RSA_X509_PEM

# TODO: Auth type
device_template = {
    "id": gateway_id,
    "credentials": [
        {"public_key": {"format": certificate_format, "key": certificate}}
    ],
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Gateway {}".format(res))
else:
    print("Gateway exists, skipping")

Sekarang, skrip membuat perangkat di registry yang sama dengan gateway:

Python

print("Creating device to bind: {}".format(device_id))
manager.create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

Kode berikut dalam sampel pengelola perangkat dipanggil oleh skrip:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'

# Check that the device doesn't already exist
client = iot_v1.DeviceManagerClient()

exists = False

parent = client.registry_path(project_id, cloud_region, registry_id)

devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == device_id:
        exists = True

# Create the device
device_template = {
    "id": device_id,
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.NON_GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Device {}".format(res))
else:
    print("Device exists, skipping")

Setelah perangkat dan gateway berada di registry demo, skrip dapat mengikat perangkat ke gateway. Mengikat perangkat memungkinkan gateway untuk memasang dan melepaskan perangkat melalui koneksi gateway ke jembatan protokol Cloud IoT Core.

Python

print("Binding device")
manager.bind_device_to_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)

Kode berikut dalam sampel pengelola perangkat dipanggil oleh skrip:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.bind_device_to_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device Bound! {}".format(res))

Setelah perangkat terikat ke gateway, skrip akan dijeda sehingga Anda dapat menyetel data konfigurasi untuk gateway atau perangkat yang terikat.

Untuk menyetel data konfigurasi:

  1. Buka Google Cloud Console.
  2. Setel data konfigurasi untuk gateway atau perangkat terikat.
  3. Verifikasi bahwa gateway dan perangkat menerima konfigurasi terbaru sebelum gateway mulai mendengarkan.

Setelah Anda menyetel data konfigurasi dan melanjutkan skrip, gateway akan terhubung ke jembatan protokol Cloud IoT Core, melampirkan perangkat, lalu menerima pesan konfigurasi untuk perangkat.

Untuk mendengarkan pesan konfigurasi, skrip memanggil fungsi bantuan dari sampel MQTT sebagai berikut:

Python

print("Listening for messages for {} seconds".format(listen_time))
print("Try setting configuration in: ")
print("\t{}".format(edit_template.format(registry_id, project_id)))
try:
    input("Press enter to continue")
except SyntaxError:
    pass

def log_callback(client):
    def log_on_message(unused_client, unused_userdata, message):
        if not os.path.exists(log_path):
            with open(log_path, "w") as csvfile:
                logwriter = csv.writer(csvfile, dialect="excel")
                logwriter.writerow(["time", "topic", "data"])

        with open(log_path, "a") as csvfile:
            logwriter = csv.writer(csvfile, dialect="excel")
            logwriter.writerow(
                [
                    datetime.datetime.now().isoformat(),
                    message.topic,
                    message.payload,
                ]
            )

    client.on_message = log_on_message

cloudiot_mqtt_example.listen_for_messages(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    listen_time,
    log_callback,
)

Kode berikut dalam sampel MQTT dipanggil oleh skrip:

Python

global minimum_backoff_time

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.utcnow()
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Saat skrip berjalan, skrip akan menulis pesan konfigurasi ke log yang dapat disesuaikan untuk mengirimkan konfigurasi ke perangkat yang terpasang.

Setelah menunjukkan bagaimana Anda dapat menerima pesan konfigurasi atas nama gateway dan perangkat yang terikat, skrip mengirimkan data status atas nama perangkat yang terikat. Untuk melakukannya, skrip memanggil fungsi bantuan dari sampel MQTT:

Python

print("Publishing messages demo")
print("Publishing: {} messages".format(num_messages))
cloudiot_mqtt_example.send_data_from_bound_device(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    "Hello from gateway_demo.py",
)

print("You can read the state messages for your device at this URL:")
print("\t{}".format(device_url_template).format(registry_id, device_id, project_id))
try:
    input("Press enter to continue after reading the messages.")
except SyntaxError:
    pass

Kode berikut dalam sampel MQTT dipanggil oleh skrip:

Python

global minimum_backoff_time

# Publish device events and gateway state.
device_topic = "/devices/{}/{}".format(device_id, "state")
gateway_topic = "/devices/{}/{}".format(gateway_id, "state")

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# Publish state to gateway topic
gateway_state = "Starting gateway at: {}".format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state)

# Publish num_messages messages to the MQTT bridge
for i in range(1, num_messages + 1):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    payload = "{}/{}-{}-payload-{}".format(registry_id, gateway_id, device_id, i)

    print(
        "Publishing message {}/{}: '{}' to {}".format(
            i, num_messages, payload, device_topic
        )
    )
    client.publish(device_topic, "{} : {}".format(device_id, payload))

    seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s").format(seconds_since_issue)
        jwt_iat = datetime.datetime.utcnow()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(5)

detach_device(client, device_id)

print("Finished.")

Skrip akan dijeda setelah mentransmisikan data status atas nama perangkat yang terikat sehingga Anda dapat membuka Cloud Console untuk melihat data status sebelum registry demo, gateway, dan perangkat dihapus pada langkah berikutnya.

Saat skrip selesai, skrip tersebut akan membebaskan gateway, perangkat, dan registry yang dialokasikan di awal.

Python

# Clean up
manager.unbind_device_from_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, gateway_id
)
manager.delete_registry(service_account_json, project_id, cloud_region, registry_id)

Perangkat dilepas dari gateway sebelum dihapus:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.unbind_device_from_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device unbound: {}".format(res))

Skrip memanggil fungsi bantuan dari contoh pengelola perangkat untuk menghapus perangkat yang terikat dan gateway:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
print("Delete device")
client = iot_v1.DeviceManagerClient()

device_path = client.device_path(project_id, cloud_region, registry_id, device_id)

return client.delete_device(request={"name": device_path})

Terakhir, skrip memanggil fungsi bantuan dari contoh pengelolaan registry untuk menghapus registry:

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
print("Delete registry")

client = iot_v1.DeviceManagerClient()
registry_path = client.registry_path(project_id, cloud_region, registry_id)

try:
    client.delete_device_registry(request={"name": registry_path})
    print("Deleted registry")
    return "Registry deleted"
except HttpError:
    print("Error, registry not deleted")
    raise