MQTT 게이트웨이 데모 스크립트

이 페이지에서는 게이트웨이 작동 방법을 보여주는 Python 스크립트를 제공합니다. 이 스크립트는 Cloud IoT Core 샘플을 사용해서 먼저 데모 레지스트리, 게이트웨이, 기기를 만듭니다. 그런 후 기기를 게이트웨이에 결합하고, 구성 메시지를 리슨하고, 상태 데이터를 기기 대신 전송합니다. 마지막으로 기기를 결합 해제하고, 기기 및 게이트웨이를 삭제하고, 레지스트리를 지웁니다.

기능

이 스크립트는 다음 게이트웨이 기능을 보여줍니다.

  • 게이트웨이 만들기
  • 게이트웨이에 결합할 기기 만들기
  • 게이트웨이에 기기 결합
  • 구성 메시지 리슨
  • 상태 데이터 전송
  • 게이트웨이에서 기기 결합 해제
  • 기기 삭제 후 게이트웨이 삭제

목표

이 스크립트를 실행한 후에는 다음을 수행하는 코드를 이해할 수 있습니다.

  • 레지스트리 만들기
  • 결합할 게이트웨이 및 기기 만들기
  • 게이트웨이를 기기에 결합
  • 구성 메시지 리슨

가격 책정

이 데모에서 전송되는 데이터 양은 무료 사용량 등급에 포함됩니다. 자세한 내용은 가격 책정을 참조하세요.

시작하기 전에

이 데모에서는 사용자가 Python을 잘 알고 있고 게이트웨이 개요를 검토했다고 가정합니다.

이 샘플을 실행하려면 프로젝트 ID 및 Google Cloud CLI를 가져오고 결제가 사용 설정되어 있는지 확인해야 합니다.

  1. 새 Console 프로젝트를 만들거나 Google Cloud Console에서 기존 프로젝트의 프로젝트 ID를 가져옵니다.

    프로젝트 페이지로 이동

  2. Google Cloud CLI를 설치하고 초기화합니다.

    SDK 다운로드

  3. 프로젝트에 결제가 사용 설정되어 있는지 확인하세요.

    결제 사용 설정 방법 알아보기

스크립트 다운로드

데모 스크립트를 다운로드하고 현재 디렉터리를 설정합니다.

git clone https://github.com/GoogleCloudPlatform/python-docs-samples
cd python-docs-samples/iot/api-client/mqtt_example

종속 항목 설치

iot/api-client/mqtt_example 디렉터리 내에서 예시를 실행하는 데 필요한 종속 항목을 설치합니다.

pip install -r requirements.txt --user

시스템에 pip를 설치하는 것과 같이 Python 개발 환경 설정에 관한 자세한 내용은 Python 개발 환경 설정 가이드를 참조하세요.

사용자 인증 정보 만들기

예시를 실행하기 전에 다음 단계를 완료합니다.

  1. 서비스 계정을 만듭니다.

    1. 콘솔에서 서비스 계정 페이지로 이동합니다.

      서비스 계정으로 이동

    2. 프로젝트를 선택합니다.

    3. 서비스 계정 만들기를 클릭합니다.

    4. 서비스 계정 이름 필드에 이름을 입력합니다. 콘솔은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

    5. 선택사항: 서비스 계정 설명 필드에 서비스 계정의 설명을 입력합니다.

    6. 만들고 계속하기를 클릭합니다.

    7. 역할 선택 필드를 클릭하고 기본 > 편집자를 선택합니다.

    8. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

      브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

  2. 방금 만든 서비스 계정의 JSON 키를 다운로드합니다.

    1. 콘솔에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
    2. 를 클릭합니다.
    3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
    4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.

      이 키를 iot/api-client/mqtt_example 디렉터리에 저장하고 이름을 service_account.json으로 바꿉니다.

    5. 닫기를 클릭합니다.

  3. Google CA 루트 인증서를 예시 파일과 동일한 디렉터리에 다운로드합니다. 선택적으로 --ca_certs 플래그로 인증서 위치를 설정합니다.

스크립트에 프로젝트 ID 및 사용자 인증 정보 추가

  1. 스크립트에서 GOOGLE_CLOUD_PROJECT 환경 변수를 프로젝트 ID로 설정합니다.
  2. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 service_account.json으로 설정합니다.

스크립트 로컬 실행

프로젝트 하위 디렉터리 python-docs-samples/iot/api-client/mqtt_example에서 다음 명령어를 호출하여 스크립트를 실행합니다.

python gateway_demo.py

스크립트가 실행되면 터미널에 출력을 씁니다.

gateway_demo.py 연습

이 섹션에서는 스크립트의 각 단계에서 수행되는 작업을 설명합니다.

먼저 스크립트가 임시 기기 레지스트리를 만듭니다. 여기에는 게이트웨이와 이를 사용해서 Cloud IoT Core와 통신하는 기기가 모두 포함됩니다.

Python

print("Creating registry: {}".format(registry_id))
manager.create_registry(
    service_account_json, project_id, cloud_region, pubsub_topic, registry_id
)

레지스트리 관리 샘플의 다음 코드가 스크립트에서 호출됩니다.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# pubsub_topic = 'your-pubsub-topic'
# registry_id = 'your-registry-id'
client = iot_v1.DeviceManagerClient()
parent = f"projects/{project_id}/locations/{cloud_region}"

if not pubsub_topic.startswith("projects/"):
    pubsub_topic = "projects/{}/topics/{}".format(project_id, pubsub_topic)

body = {
    "event_notification_configs": [{"pubsub_topic_name": pubsub_topic}],
    "id": registry_id,
}

try:
    response = client.create_device_registry(
        request={"parent": parent, "device_registry": body}
    )
    print("Created registry")
    return response
except HttpError:
    print("Error, registry not created")
    raise
except AlreadyExists:
    print("Error, registry already exists")
    raise

그런 후 스크립트가 게이트웨이를 만들고 이를 레지스트리에 추가합니다.

Python

print("Creating gateway: {}".format(gateway_id))
manager.create_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    None,
    gateway_id,
    rsa_cert_path,
    "RS256",
)

기기 관리 샘플의 다음 코드가 스크립트로 호출됩니다.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
# certificate_file = 'path/to/certificate.pem'
# algorithm = 'ES256'
# Check that the gateway doesn't already exist
exists = False
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)
devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == gateway_id:
        exists = True
    print(
        "Device: {} : {} : {} : {}".format(
            device.id, device.num_id, device.config, device.gateway_config
        )
    )

with io.open(certificate_file) as f:
    certificate = f.read()

if algorithm == "ES256":
    certificate_format = iot_v1.PublicKeyFormat.ES256_PEM
else:
    certificate_format = iot_v1.PublicKeyFormat.RSA_X509_PEM

# TODO: Auth type
device_template = {
    "id": gateway_id,
    "credentials": [
        {"public_key": {"format": certificate_format, "key": certificate}}
    ],
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Gateway {}".format(res))
else:
    print("Gateway exists, skipping")

이제 스크립트가 게이트웨이와 동일한 레지스트리에 기기를 만듭니다.

Python

print("Creating device to bind: {}".format(device_id))
manager.create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

기기 관리자 샘플의 다음 코드가 스크립트로 호출됩니다.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'

# Check that the device doesn't already exist
client = iot_v1.DeviceManagerClient()

exists = False

parent = client.registry_path(project_id, cloud_region, registry_id)

devices = list(client.list_devices(request={"parent": parent}))

for device in devices:
    if device.id == device_id:
        exists = True

# Create the device
device_template = {
    "id": device_id,
    "gateway_config": {
        "gateway_type": iot_v1.GatewayType.NON_GATEWAY,
        "gateway_auth_method": iot_v1.GatewayAuthMethod.ASSOCIATION_ONLY,
    },
}

if not exists:
    res = client.create_device(
        request={"parent": parent, "device": device_template}
    )
    print("Created Device {}".format(res))
else:
    print("Device exists, skipping")

기기 및 게이트웨이가 데모 레지스트리에 포함된 다음 스크립트가 기기를 게이트웨이에 결합할 수 있습니다. 기기를 결합하면 게이트웨이가 Cloud IoT Core 프로토콜 브리지에 대한 게이트웨이 연결을 통해 기기를 연결하거나 분리할 수 있습니다.

Python

print("Binding device")
manager.bind_device_to_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)

기기 관리자 샘플의 다음 코드가 스크립트로 호출됩니다.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

create_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.bind_device_to_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device Bound! {}".format(res))

기기가 게이트웨이에 결합된 후 스크립트가 일시중지되고 사용자가 게이트웨이 또는 결합된 기기에 대해 구성 데이터를 설정할 수 있습니다.

구성 데이터를 설정하려면 다음 안내를 따르세요.

  1. Google Cloud Console로 이동합니다.
  2. 게이트웨이 또는 결합된 기기에 대해 구성 데이터를 설정합니다.
  3. 게이트웨이가 리슨을 시작하기 전 게이트웨이 및 기기가 최신 구성을 수신하는지 확인합니다.

구성 데이터를 설정하고 스크립트를 계속하면 게이트웨이가 Cloud IoT Core 프로토콜 브리지에 연결하고, 기기를 연결한 후 기기에 대한 구성 메시지를 수신합니다.

구성 메시지를 리슨하기 위해 스크립트는 다음과 같이 MQTT 샘플에서 도우미 함수를 호출합니다.

Python

print("Listening for messages for {} seconds".format(listen_time))
print("Try setting configuration in: ")
print("\t{}".format(edit_template.format(registry_id, project_id)))
try:
    input("Press enter to continue")
except SyntaxError:
    pass

def log_callback(client):
    def log_on_message(unused_client, unused_userdata, message):
        if not os.path.exists(log_path):
            with open(log_path, "w") as csvfile:
                logwriter = csv.writer(csvfile, dialect="excel")
                logwriter.writerow(["time", "topic", "data"])

        with open(log_path, "a") as csvfile:
            logwriter = csv.writer(csvfile, dialect="excel")
            logwriter.writerow(
                [
                    datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
                    message.topic,
                    message.payload,
                ]
            )

    client.on_message = log_on_message

cloudiot_mqtt_example.listen_for_messages(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    listen_time,
    log_callback,
)

MQTT 샘플의 다음 코드가 스크립트에서 호출됩니다.

Python

global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

스크립트가 실행되면 구성 메시지를 로그에 기록합니다. 이 로그는 연결된 기기로 구성을 전송하도록 맞춤설정될 수 있습니다.

이 스크립트는 게이트웨이 및 결합된 기기 대신 구성 메시지를 수신하는 방법을 보여준 후 결합된 기기 대신 상태 데이터를 전송합니다. 이를 위해 스크립트가 MQTT 샘플에서 도우미 함수를 호출합니다.

Python

print("Publishing messages demo")
print("Publishing: {} messages".format(num_messages))
cloudiot_mqtt_example.send_data_from_bound_device(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
    num_messages,
    rsa_private_path,
    "RS256",
    ca_cert_path,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
    jwt_exp_time,
    "Hello from gateway_demo.py",
)

print("You can read the state messages for your device at this URL:")
print("\t{}".format(device_url_template).format(registry_id, device_id, project_id))
try:
    input("Press enter to continue after reading the messages.")
except SyntaxError:
    pass

MQTT 샘플의 다음 코드가 스크립트에서 호출됩니다.

Python

global minimum_backoff_time

# Publish device events and gateway state.
device_topic = "/devices/{}/{}".format(device_id, "state")
gateway_topic = "/devices/{}/{}".format(gateway_id, "state")

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# Publish state to gateway topic
gateway_state = "Starting gateway at: {}".format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state)

# Publish num_messages messages to the MQTT bridge
for i in range(1, num_messages + 1):
    client.loop()

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    payload = "{}/{}-{}-payload-{}".format(registry_id, gateway_id, device_id, i)

    print(
        "Publishing message {}/{}: '{}' to {}".format(
            i, num_messages, payload, device_topic
        )
    )
    client.publish(device_topic, "{} : {}".format(device_id, payload))

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s").format(seconds_since_issue)
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(5)

detach_device(client, device_id)

print("Finished.")

결합된 기기 대신 상태 데이터를 전송한 후 스크립트가 일시중지되므로, 다음 단계에서 데모 레지스트리, 게이트웨이, 기기가 삭제되기 전에 콘솔로 이동해서 상태 데이터를 확인할 수 있습니다.

스크립트가 완료되면 처음에 할당되었던 게이트웨이, 기기, 레지스트리가 삭제됩니다.

Python

# Clean up
manager.unbind_device_from_gateway(
    service_account_json,
    project_id,
    cloud_region,
    registry_id,
    device_id,
    gateway_id,
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, device_id
)
manager.delete_device(
    service_account_json, project_id, cloud_region, registry_id, gateway_id
)
manager.delete_registry(service_account_json, project_id, cloud_region, registry_id)

기기는 삭제되기 전 게이트웨이에서 결합 해제됩니다.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
# gateway_id = 'your-gateway-id'
client = iot_v1.DeviceManagerClient()

parent = client.registry_path(project_id, cloud_region, registry_id)

res = client.unbind_device_from_gateway(
    request={"parent": parent, "gateway_id": gateway_id, "device_id": device_id}
)

print("Device unbound: {}".format(res))

스크립트가 기기 관리자 샘플에서 도우미 함수를 호출하여 결합된 기기 및 게이트웨이를 모두 삭제합니다.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
# device_id = 'your-device-id'
print("Delete device")
client = iot_v1.DeviceManagerClient()

device_path = client.device_path(project_id, cloud_region, registry_id, device_id)

return client.delete_device(request={"name": device_path})

마지막으로 기기가 레지스트리 관리 샘플에서 도우미 함수를 호출하여 레지스트리를 삭제합니다.

Python

# project_id = 'YOUR_PROJECT_ID'
# cloud_region = 'us-central1'
# registry_id = 'your-registry-id'
print("Delete registry")

client = iot_v1.DeviceManagerClient()
registry_path = client.registry_path(project_id, cloud_region, registry_id)

try:
    client.delete_device_registry(request={"name": registry_path})
    print("Deleted registry")
    return "Registry deleted"
except HttpError:
    print("Error, registry not deleted")
    raise