MQTT 브리지를 통해 게이트웨이 사용

이 페이지에서는 게이트웨이가 MQTT 브리지를 사용해서 Cloud IoT Core와 통신하고 결합된 기기를 대신하여 원격 분석 이벤트를 게시하는 방법을 설명합니다. 시작하기 전에 Cloud IoT Core에서 MQTT 브리지를 사용하는 방법에 대한 일반적인 내용은 MQTT 브리지 사용을 참조하세요.

엔드 투 엔드 데모 사용해 보기

MQTT 브리지를 통해 게이트웨이 사용

  1. 게이트웨이를 만들고 구성한 다음에는 MQTT 브리지를 통해 게이트웨이를 Cloud IoT Core에 연결합니다.
  2. 아직 기기를 만들지 않았으면 기기를 만듭니다.
  3. 선택사항: 기기를 게이트웨이에 결합합니다.

    MQTT 브리지를 사용하는 경우에는 자체 JWT를 생성할 수 없는 경우에만 기기를 결합하면 됩니다.

  4. 선택사항: 시스템 오류 주제를 구독하여 기기 작업의 성공 여부에 대한 피드백을 확인합니다.

  5. 게이트웨이에 기기를 연결합니다.

  6. 게이트웨이를 사용하여 기기 대신 원격 분석, 기기 상태, 구성 메시지를 중계합니다. 방법을 알아보려면 엔드 투 엔드 데모를 사용해보세요.

게이트웨이 메시지

게이트웨이는 MQTT 브리지를 통해 Cloud IoT Core에 연결된 후 세 가지 유형의 메시지를 송수신할 수 있습니다.

  • 제어 메시지: 기기를 게이트웨이에 연결하거나 게이트웨이에서 기기를 분리합니다. 이러한 메시지는 게이트웨이와 Cloud IoT Core 사이에 전송됩니다. Cloud IoT Core는 게이트웨이의 제어 메시지만 수락합니다. 다른 유형의 기기가 제어 메시지를 전송하려고 시도하면 Cloud IoT Core가 연결을 종료합니다.
  • 게이트웨이 및 기기의 메시지: 기기 대신 게이트웨이에서 중계되거나 게이트웨이 자체에서 직접 전송될 수 있습니다.
  • 시스템 오류 메시지: 게이트웨이가 기기 대신 MQTT 시스템 오류 주제에 구독된 경우 기기에 오류가 발생할 때마다 Cloud IoT Core가 게이트웨이에 오류 메시지를 전송합니다.

게이트웨이에 기기 연결

Cloud IoT Core로 게이트웨이의 기기 통신 프록시를 사용 설정하려면 게이트웨이가 MQTT 브리지를 통해 QoS 1 /devices/{device_ID_to_attach}/attach 제어 메시지를 게시하도록 합니다.

기기 JWT를 사용해서 기기를 인증하도록 게이트웨이를 구성한 경우 연결 메시지의 페이로드에 JSON 형식의 토큰({ "authorization" : "{JWT_token}" })이 포함되어야 합니다. 그렇지 않으면 Cloud IoT Core가 게이트웨이와 연결을 확인하여 기기를 인증합니다.

성공 응답

기기가 승인되면 Cloud IoT Core가 연결 메시지에 대한 응답으로 PUBACK 메시지를 게이트웨이로 전송합니다. 게이트웨이가 PUBACK 메시지를 수신한 다음에는 기기 대신 원격 분석 또는 구성 메시지와 같은 Cloud IoT Core 주제를 게시 및 구독할 수 있습니다.

게이트웨이가 연결 메시지를 전송할 때 기기가 이미 연결되어 있으면 Cloud IoT Core가 PUBACK 메시지를 전송합니다.

게이트웨이에서 기기 분리

게이트웨이에서 기기를 분리하려면 게이트웨이가 MQTT 브리지를 통해 QoS 1 /devices/{device_ID}/detach 제어 메시지를 게시하도록 합니다. 메시지가 전송될 때 기기가 연결되지 않았으면 Cloud IoT Core가 분리 제어 메시지를 무시하고 PUBACK 메시지를 전송합니다.

문제 해결

기기에 오류가 발생할 때 알림을 받으려면 QoS 수준 0을 사용해서 게이트웨이를 MQTT /devices/{gateway_ID}/errors 주제에 등록합니다.

C#

errors 주제를 포함하여 기기가 게이트웨이의 모든 주제를 구독하는 단계는 아래에 강조표시되어 있습니다.
public static object ListenForConfigMessages(string projectId, string cloudRegion,
    string registryId, string deviceId, string gatewayId, int numMessages,
    string privateKeyFile, string algorithm, string caCerts, string mqttBridgeHostname,
    int mqttBridgePort, int jwtExpiresMinutes, int duration)
{
    var clientId = $"projects/{projectId}/locations/{cloudRegion}/registries/{registryId}" +
        $"/devices/{gatewayId}";
    var jwtIatTime = SystemClock.Instance.GetCurrentInstant();
    // Create a duration
    Duration durationExp = Duration.FromMinutes(jwtExpiresMinutes);
    var jwtExpTime = SystemClock.Instance.GetCurrentInstant().Plus(durationExp);
    var pass = "";
    if (algorithm == "RS256")
    {
        pass = CloudIotMqttExample.CreateJwtRsa(projectId, privateKeyFile);
    }
    else if (algorithm == "ES256")
    {
        Console.WriteLine("Currently, we do not support this algorithm.");
        return 0;
    }

    // Use gateway to connect server
    var mqttClient = GetClient(
      projectId,
      cloudRegion,
      registryId,
      gatewayId,
      caCerts,
      mqttBridgeHostname,
      mqttBridgePort);

    double initialConnectIntervalMillis = 0.5;
    double maxConnectIntervalMillis = 6;
    double maxConnectRetryTimeElapsedMillis = 900;
    double intervalMultiplier = 1.5;

    double retryIntervalMs = initialConnectIntervalMillis;
    double totalRetryTimeMs = 0;

    // Both connect and publish operations may fail. If they do,
    // allow retries but with an exponential backoff time period.
    while (!mqttClient.IsConnected &&
        totalRetryTimeMs < maxConnectRetryTimeElapsedMillis)
    {
        try
        {
            // Connect to the Google MQTT bridge.
            mqttClient.Connect(clientId, "unused", pass);
        }
        catch (AggregateException aggExceps)
        {
            printExceptions(aggExceps);
            Console.WriteLine("Retrying in " + retryIntervalMs
                + " seconds.");

            System.Threading.Thread.Sleep((int)retryIntervalMs);
            totalRetryTimeMs += retryIntervalMs;
            retryIntervalMs *= intervalMultiplier;
            if (retryIntervalMs > maxConnectIntervalMillis)
            {
                retryIntervalMs = maxConnectIntervalMillis;
            }
        }
    }

    SetupMqttTopics(mqttClient, gatewayId);
    AttachDevice(mqttClient, deviceId, "{}");

    // Wait for about a minute for config messages.
    Console.WriteLine("Listening...");
    for (int i = 0; i < duration; ++i)
    {
        Console.Write(".");
        var secSinceIssue = SystemClock.Instance.GetCurrentInstant().Minus(jwtIatTime);
        if (secSinceIssue.TotalSeconds > (60 * jwtExpiresMinutes))
        {
            Console.WriteLine("Refreshing token after {0}s", secSinceIssue);
            jwtIatTime = SystemClock.Instance.GetCurrentInstant();
            // refresh token and reconnect.
            pass = CloudIotMqttExample.CreateJwtRsa(projectId, privateKeyFile);
            mqttClient = GetClient(
                        projectId,
                        cloudRegion,
                        registryId,
                        gatewayId,
                        caCerts,
                        mqttBridgeHostname,
                        mqttBridgePort);
        }
        System.Threading.Thread.Sleep(1000);
    }

    DetachDevice(mqttClient, deviceId);
    // wait for the device get detached.
    System.Threading.Thread.Sleep(2000);
    mqttClient.Disconnect();
    Console.WriteLine("Finished.");
    return 0;
}

Go

기기가 errors 주제를 구독하는 단계는 아래에 강조표시되어 있습니다.

import (
	"fmt"
	"io"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {

	const (
		mqttBrokerURL   = "tls://mqtt.googleapis.com:8883"
		protocolVersion = 4 // corresponds to MQTT 3.1.1
	)

	// onConnect defines the on connect handler which resets backoff variables.
	var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
		fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
	}

	// onMessage defines the message handler for the mqtt client.
	var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
		fmt.Fprintf(w, "Message: %s\n", msg.Payload())
	}

	// onDisconnect defines the connection lost handler for the mqtt client.
	var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
		fmt.Println("Client disconnected")
	}

	jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
	clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(mqttBrokerURL)
	opts.SetClientID(clientID)
	opts.SetUsername("unused")
	opts.SetPassword(jwt)
	opts.SetProtocolVersion(protocolVersion)
	opts.SetOnConnectHandler(onConnect)
	opts.SetDefaultPublishHandler(onMessage)
	opts.SetConnectionLostHandler(onDisconnect)

	// Create and connect a client using the above options.
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to connect client")
		return token.Error()
	}

	if err := attachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "AttachDevice error: %v\n", err)
		return err
	}

	// Sleep for 5 seconds to allow attachDevice message to propagate.
	time.Sleep(5 * time.Second)

	// Subscribe to the config topic of the current gateway and a device bound to the gateway.
	gatewayTopic := fmt.Sprintf("/devices/%s/%s", gatewayID, topic)
	if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	deviceTopic := fmt.Sprintf("/devices/%s/%s", deviceID, topic)
	if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	time.Sleep(time.Duration(clientDuration) * time.Second)

	if err := detachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "DetachDevice error: %v\n", err)
		return err
	}

	if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	client.Disconnect(10)
	return nil
}

자바

기기가 errors 주제를 구독하는 단계는 아래에 강조표시되어 있습니다.

// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while ((totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, gatewayId);

// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));

client.subscribe(errorTopic, 0);

return client;

Node.js

예를 들어 기기가 config 주제를 구독하는 단계는 아래에 강조표시되어 있습니다. errors 주제를 구독하려면 /devices/${gateway_ID}/errors를 지정합니다.
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  qos: 1,
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on('connect', success => {
  if (!success) {
    console.log('Client not connected...');
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, {qos: 0});

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log('Closing connection to MQTT. Goodbye!');
        client.end(true);
      }, clientDuration); // Safely detach device and close connection.
    }, 5000);
  }
});

client.on('close', () => {
  console.log('Connection closed');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  const decodedMessage = Buffer.from(message, 'base64').toString('ascii');

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

Python

기기가 errors 주제를 구독하는 단계는 아래에 강조표시되어 있습니다.
global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print("Refreshing token after {}s".format(seconds_since_issue))
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Cloud IoT Core가 QoS 0으로 전송되는 게이트웨이 오류를 최상의 방식으로 전송합니다. 게이트웨이가 /devices/{gateway_ID}/errors를 구독하지 않으면 Cloud IoT Core에 실패 이벤트가 로깅되더라도 PUBACK 메시지를 전송하지 않습니다.

MQTT 오류의 구조는 다음과 같습니다.

string error_type;  // A string description of the error type.
string device_id;   // The ID of the device that caused the error.
string description; // A description of the error.

MQTT 메시지로 오류 메시지가 트리거된 경우 다음 정보도 함께 연결됩니다.

string message_type;  // The string MQTT message type.
string topic;  // The MQTT topic if applicable, otherwise it is empty.
int packet_id;  // The packet ID of the MQTT message if applicable, otherwise it is zero.

오류 코드 및 오류 처리

오류 코드 설명 권장 작업
GATEWAY_ATTACHMENT_ERROR 게이트웨이 연결 요청이 실패했습니다. 문제를 해결하지 않은 상태로 다시 시도하지 마세요.
GATEWAY_DEVICE_NOT_FOUND 게이트웨이가 수신 메시지 처리를 위해 연결된 기기를 찾을 수 없습니다. 문제를 해결하지 않은 상태로 다시 시도하지 마세요.
GATEWAY_INVALID_MQTT_TOPIC 게이트웨이가 지정된 MQTT 주제를 파싱할 수 없습니다. 형식이 잘못되었거나 포함된 기기 ID 또는 이름이 잘못되었습니다. 문제를 해결하지 않은 상태로 다시 시도하지 마세요.
GATEWAY_UNEXPECTED_PACKET_ID 게이트웨이가 패킷 ID를 기준으로 메시지를 처리할 수 없습니다. 예를 들어 PUBACK에 패킷 ID가 있을 수 있지만 응답을 대기 중인 항목이 없습니다. 문제를 해결하지 않은 상태로 다시 시도하지 마세요.
GATEWAY_UNEXPECTED_MESSAGE_TYPE 지원되지 않는 PUBREL, PUBREC 등의 예기치 않은 메시지가 게이트웨이에 수신되었습니다. 문제를 해결하지 않은 상태로 다시 시도하지 마세요.
GATEWAY_DETACHMENT_DEVICE_ERROR 기기 오류로 인해 게이트웨이가 기기를 분리했습니다. 문제를 해결하지 않은 상태로 다시 시도하지 마세요.
UNKNOWN 알 수 없는 오류입니다. 지수 백오프를 사용하여 다시 시도하세요.

자세한 내용은 오류 메시지 문서 및 MQTT 버전 3.1.1 사양을 참조하세요.