Google Cloud IoT Core は 2023 年 8 月 16 日に廃止されます。詳細については、担当の Google Cloud アカウント チームにお問い合わせください。

MQTT ブリッジとともにゲートウェイを使用する

このページでは、ゲートウェイが MQTT ブリッジを使用して、Cloud IoT Core と通信し、バインドされたデバイスに代わってテレメトリー イベントをパブリッシュする方法について説明します。始める前に、Cloud IoT Core での MQTT ブリッジの使用に関する一般的な情報について、MQTT ブリッジの使用をご確認ください。

エンドツーエンドのデモを試す

MQTT ブリッジとともにゲートウェイを使用する

  1. ゲートウェイを作成して構成したら、MQTT ブリッジを介して Cloud IoT Core に接続します。
  2. まだ作成していない場合は、デバイスを作成します。
  3. 省略可: デバイスをゲートウェイにバインドします

    MQTT ブリッジを使用する場合、独自の JWT を生成できない場合にのみ、デバイスをバインドする必要があります。

  4. 省略可: デバイス オペレーションが成功したかどうかについてのフィードバックを得るには、システムエラーのトピックに登録します。

  5. デバイスをゲートウェイに接続します

  6. ゲートウェイを使用して、デバイスの代わりにテレメトリー、デバイスの状態、構成メッセージをリレーします。エンドツーエンドのデモを使って、実際にやってみてください

ゲートウェイ メッセージ

ゲートウェイは、MQTT ブリッジを介して Cloud IoT Core に接続した後、3 つのタイプのメッセージを送受信できます。

  • 制御メッセージ: デバイスをゲートウェイに接続するか、ゲートウェイから接続解除します。これらのメッセージは、ゲートウェイと Cloud IoT Core の間で送信されます。Cloud IoT Core は、ゲートウェイからの制御メッセージのみを受け入れます。別の種類のデバイスがコントロール メッセージを送信しようとすると、Cloud IoT Core は接続を終了します。
  • ゲートウェイやデバイスからのメッセージ: デバイスに代わってゲートウェイによってリレーされるか、ゲートウェイ自体から直接送信されます。
  • システム エラー メッセージ: デバイスの代わりにゲートウェイを MQTT システムのエラー トピックに登録すると、デバイスでエラーが発生するたびに、Cloud IoT Core によってゲートウェイにエラー メッセージが送信されます。

ゲートウェイにデバイスを接続する

ゲートウェイが Cloud IoT Core とのデバイス通信をプロキシできるようにするには、MQTT ブリッジを介して QoS 1 /devices/{device_ID_to_attach}/attach制御メッセージをゲートウェイにパブリッシュします。

デバイスの JWT を使用してデバイスを認証するようにゲートウェイを構成している場合、接続メッセージのペイロードには JSON 形式のトークン({ "authorization" : "{JWT_token}" })を含める必要があります。それ以外の場合、Cloud IoT Core はデバイスのゲートウェイとの関連付けをチェックして、デバイスの認証を行います。

成功のレスポンス

デバイスが承認されると、Cloud IoT Core は接続メッセージへのレスポンスとして、ゲートウェイに PUBACK メッセージを送信します。ゲートウェイは、PUBACK メッセージを受信すると、デバイスの代わりに Cloud IoT Core トピック(テレメトリーや構成メッセージなど)をパブリッシュし、登録できます。

ゲートウェイが接続メッセージを送信したときにデバイスがすでに接続されている場合、Cloud IoT Core は PUBACK メッセージで応答します。

ゲートウェイからデバイスを切断する

ゲートウェイからデバイスを切断するには、ゲートウェイで MQTT ブリッジを介して QoS 1 /devices/{device_ID}/detach 制御メッセージをパブリッシュします。メッセージの送信時にデバイスが接続されていない場合、Cloud IoT Core は切断の制御メッセージを無視し、PUBACK メッセージを送信します。

トラブルシューティング

デバイスでエラーが発生した場合に通知を受け取るには、QoS レベル 0 を使用してゲートウェイを MQTT /devices/{gateway_ID}/errors トピックに登録します。

Go

デバイスを errors トピックに登録する手順は、以下のハイライト表示のとおりです。

import (
	"fmt"
	"io"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// subscribeGatewayToDeviceTopic creates a gateway client that subscribes to a topic of a bound device.
// Currently supported topics include: "config", "state", "commands", "errors"
func subscribeGatewayToDeviceTopic(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, clientDuration int, topic string) error {

	const (
		mqttBrokerURL   = "tls://mqtt.googleapis.com:8883"
		protocolVersion = 4 // corresponds to MQTT 3.1.1
	)

	// onConnect defines the on connect handler which resets backoff variables.
	var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
		fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
	}

	// onMessage defines the message handler for the mqtt client.
	var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
		fmt.Fprintf(w, "Message: %s\n", msg.Payload())
	}

	// onDisconnect defines the connection lost handler for the mqtt client.
	var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
		fmt.Println("Client disconnected")
	}

	jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
	clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(mqttBrokerURL)
	opts.SetClientID(clientID)
	opts.SetUsername("unused")
	opts.SetPassword(jwt)
	opts.SetProtocolVersion(protocolVersion)
	opts.SetOnConnectHandler(onConnect)
	opts.SetDefaultPublishHandler(onMessage)
	opts.SetConnectionLostHandler(onDisconnect)

	// Create and connect a client using the above options.
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to connect client")
		return token.Error()
	}

	if err := attachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "AttachDevice error: %v\n", err)
		return err
	}

	// Sleep for 5 seconds to allow attachDevice message to propagate.
	time.Sleep(5 * time.Second)

	// Subscribe to the config topic of the current gateway and a device bound to the gateway.
	gatewayTopic := fmt.Sprintf("/devices/%s/%s", gatewayID, topic)
	if token := client.Subscribe(gatewayTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	deviceTopic := fmt.Sprintf("/devices/%s/%s", deviceID, topic)
	if token := client.Subscribe(deviceTopic, 0, nil); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	time.Sleep(time.Duration(clientDuration) * time.Second)

	if err := detachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "DetachDevice error: %v\n", err)
		return err
	}

	if token := client.Unsubscribe(gatewayTopic, deviceTopic); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, token.Error())
		return token.Error()
	}

	client.Disconnect(10)
	return nil
}

Java

デバイスを errors トピックに登録する手順は、以下のハイライト表示のとおりです。

// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
try (MqttClient client =
    new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence())) {
  // Both connect and publish operations may fail. If they do, allow retries but with an
  // exponential backoff time period.
  long initialConnectIntervalMillis = 500L;
  long maxConnectIntervalMillis = 6000L;
  long maxConnectRetryTimeElapsedMillis = 900000L;
  float intervalMultiplier = 1.5f;

  long retryIntervalMs = initialConnectIntervalMillis;
  long totalRetryTimeMs = 0;

  while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
    try {
      client.connect(connectOptions);
    } catch (MqttException e) {
      int reason = e.getReasonCode();

      // If the connection is lost or if the server cannot be connected, allow retries, but with
      // exponential backoff.
      System.out.println("An error occurred: " + e.getMessage());
      if (reason == MqttException.REASON_CODE_CONNECTION_LOST
          || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
        System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
        Thread.sleep(retryIntervalMs);
        totalRetryTimeMs += retryIntervalMs;
        retryIntervalMs *= intervalMultiplier;
        if (retryIntervalMs > maxConnectIntervalMillis) {
          retryIntervalMs = maxConnectIntervalMillis;
        }
      } else {
        throw e;
      }
    }
  }

  attachCallback(client, gatewayId);

  // The topic gateways receive error updates on. QoS must be 0.
  String errorTopic = String.format("/devices/%s/errors", gatewayId);
  System.out.println(String.format("Listening on %s", errorTopic));

  client.subscribe(errorTopic, 0);

  return client;

Node.js

例として、デバイスを config トピックに登録する手順を以下にハイライト表示しています。errors トピックに登録するには、/devices/${gateway_ID}/errors を指定します。
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  qos: 1,
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on('connect', success => {
  if (!success) {
    console.log('Client not connected...');
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, {qos: 0});

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log('Closing connection to MQTT. Goodbye!');
        client.end(true);
      }, clientDuration); // Safely detach device and close connection.
    }, 5000);
  }
});

client.on('close', () => {
  console.log('Connection closed');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  const decodedMessage = Buffer.from(message, 'base64').toString('ascii');

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

Python

デバイスを errors トピックに登録する手順は、以下のハイライト表示のとおりです。
global minimum_backoff_time

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
)

attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = f"/devices/{device_id}/config"
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = f"/devices/{gateway_id}/config"
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = f"/devices/{gateway_id}/errors"
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
    client.loop()
    if cb is not None:
        cb(client)

    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print(f"Refreshing token after {seconds_since_issue}s")
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            project_id,
            cloud_region,
            registry_id,
            gateway_id,
            private_key_file,
            algorithm,
            ca_certs,
            mqtt_bridge_hostname,
            mqtt_bridge_port,
        )

    time.sleep(1)

detach_device(client, device_id)

print("Finished.")

Cloud IoT Core は、ベスト エフォート方式で、QoS 0 上の配信で、ゲートウェイ エラーを送信します。ゲートウェイが /devices/{gateway_ID}/errors に登録されていない場合、Cloud IoT Core は失敗イベントをログに記録しますが、PUBACK メッセージを送信しません。

MQTT エラーの構造は次のとおりです。

string error_type;  // A string description of the error type.
string device_id;   // The ID of the device that caused the error.
string description; // A description of the error.

エラー メッセージが MQTT メッセージによってトリガーされた場合、次の情報も同様に添付されます。

string message_type;  // The string MQTT message type.
string topic;  // The MQTT topic if applicable, otherwise it is empty.
int packet_id;  // The packet ID of the MQTT message if applicable, otherwise it is zero.

エラーコードとエラー処理

エラーコード 説明 推奨される対応
GATEWAY_ATTACHMENT_ERROR ゲートウェイの接続リクエストが失敗しました。 問題を解決してから再試行します。
GATEWAY_DEVICE_NOT_FOUND ゲートウェイは、受信メッセージを処理する接続済みデバイスを見つけられませんでした。 問題を解決してから再試行します。
GATEWAY_INVALID_MQTT_TOPIC ゲートウェイは、指定された MQTT トピックを解析できませんでした。トピックは、形式が正しくないか、無効なデバイス ID または名前を含んでいました。 問題を解決してから再試行します。
GATEWAY_UNEXPECTED_PACKET_ID ゲートウェイはパケット ID に基づいてメッセージを処理できませんでした。たとえば、PUBACK にはパケット ID が含まれているものの、レスポンスを待機しているものがありません。 問題を解決してから再試行します。
GATEWAY_UNEXPECTED_MESSAGE_TYPE ゲートウェイが、サポートされていない PUBREL や PUBREC などの予期しないメッセージを受信しました。 問題を解決してから再試行します。
GATEWAY_DETACHMENT_DEVICE_ERROR デバイスのエラーにより、ゲートウェイがデバイスの接続を解除しました。 問題を解決してから再試行します。
不明 不明なエラーです。 指数バックオフを使用して再試行します。

詳細については、メインのエラー メッセージのドキュメントと MQTT バージョン 3.1.1 の仕様をご覧ください。