Google Cloud IoT Core は 2023 年 8 月 16 日に廃止されます。詳細については、担当の Google Cloud アカウント チームにお問い合わせください。

MQTT ブリッジ経由でのパブリッシュ

このセクションでは、デバイスが MQTT ブリッジを使用して Cloud IoT Core と通信する方法について説明します。HTTP と MQTT の一般的な情報については、プロトコルをご覧ください。

このセクションで説明する各メソッドの詳細については、API ドキュメントを参照してください。 MQTT 関連のサンプルもご覧ください。

MQTT ブリッジを経由してパブリッシュするには:

  1. デバイスに MQTT クライアントをインストールします。

  2. デバイスに MQTT サーバー証明書をダウンロードします

  3. Cloud IoT Core に対してデバイスの認証を行うように MQTT クライアントを構成します。

  4. mqtt.googleapis.com または長期サポート ドメインを経由して TLS handshake を開始します。

  5. テレメトリー イベントをパブリッシュするか、デバイスの状態を設定します。

MQTT サーバー

Cloud IoT Core は、ポート mqtt.googleapis.com:8883 をリッスンするマネージド ブローカーを実行して、MQTT プロトコルをサポートします。ポート 8883 は、安全な MQTT 接続のために IANA によって予約されている標準 TCP ポートです。このポートへの接続には、Eclipse Paho などのオープンソース クライアントでサポートされている TLS トランスポートを使用する必要があります。

ポート 8883 がファイアウォールによってブロックされている場合は、ポート 443(mqtt.googleapis.com:443)を使用することもできます。

Quality of Service(QoS)

MQTT 仕様では、次の 3 つの Quality of Service(QoS)レベルについて説明しています。

  • QoS 0(最大で 1 回配信)
  • QoS 1(最低でも 1 回配信)
  • QoS 2(正確に 1 回のみ配信)

Cloud IoT Core は QoS 2 をサポートしていません。QoS 2 メッセージをパブリッシュすると接続が閉じられます。QoS 2 で事前定義されたトピックにサブスクライブすると、QoS レベルが QoS 1 にダウングレードされます。

Cloud IoT Core では、QoS 0 と 1 は次のように機能します。

  • QoS 1 の PUBLISH メッセージは、Cloud Pub/Sub に正常に送信された後、PUBACK メッセージによって確認応答されます。
  • QoS 0 の PUBLISH メッセージは PUBACK レスポンスを必要とせず、メッセージ配信パスにジッターがある場合(たとえば、Cloud Pub/Sub が一時的に利用できない場合)に破棄されます。
  • Cloud IoT Core の MQTT ブリッジは、配信不能なメッセージを再試行するために少量のバッファを保持します。バッファがいっぱいになった場合、QoS が 1 のメッセージは破棄され、PUBACK メッセージはクライアントに送信されません。クライアントは、メッセージを再送信する必要があります。

デバイス構成の場合、QoS レベルは次のとおりです。

  • QoS が 0 の場合、特定の構成バージョンがデバイスに一度だけパブリッシュされます。デバイスが構成を受信しない場合は、再度登録する必要があります。したがって、構成が頻繁に(数秒または数分で)更新され、更新のたびにデバイスが受信する必要がない場合、QoS 0 が便利です。
  • QoS が 1 の場合、デバイスが PUBACK で承認するまで、最新の構成の更新が再試行されます。古い構成が確認される前に新しい構成が push された場合、古い構成は再配信されません。代わりに、新しいメールが配信(および再配信)されます。このレベルはデバイス構成で最も安全なモードであり、最終的にデバイスが最新の構成を取得することが保証されます。

MQTT サーバー証明書をダウンロードする

TLS トランスポートを使用するには、デバイスが Cloud IoT Core と通信していることを、なりすましではなく Cloud IoT Core と通信していることを確認する必要があります。次の証明書パッケージが検証をサポートしています。

  • mqtt.googleapis.com 用の完全な Google ルート CA 認証パッケージ(128 KB)。

    • このパッケージは、Cloud IoT Core を含む Google のプロダクトやサービスと通信するための信頼チェーンを確立します。
    • 完全なルート CA 認証パッケージを持つデバイスは、MQTT サーバーと直接通信します。
    • このパッケージは定期的に更新されます。
  • mqtt.2030.ltsapis.goog 用の Google の最小ルート CA セット(1 KB 未満)。最小ルート CA セットには、プライマリ証明書とバックアップ証明書が含まれています。

    • このセットは、マイクロコントローラなどのメモリの制約があるデバイスを対象とし、Cloud IoT Core とのみ通信する信頼チェーンを確立します。
    • 最小ルート CA セットを持つデバイスは、長期サポート ドメインを介して Cloud IoT Core と通信します。
    • このセットは 2030 年まで固定です(プライマリ証明書とバックアップ証明書は変更されません)。セキュリティを強化するため、Google Trust Services は予告なしにプライマリ証明書とバックアップ証明書をいつでも切り替えることができます。

Google ルート CA 証明書をデバイスにダウンロードしたら、デバイスを認証する MQTT クライアントを構成し、MQTT サーバーに接続して MQTT ブリッジを介して通信を行うことができます。

MQTT クライアントを構成する

MQTT クライアントは、MQTT ブリッジに接続することによってデバイスを認証します。デバイスを認証するように MQTT クライアントを構成するには:

  1. MQTT クライアント ID を完全なデバイス パスに設定します。
    projects/PROJECT_ID/locations/REGION/registries/REGISTRY_ID/devices/DEVICE_ID
  2. MQTT クライアントを MQTT サーバー証明書に関連付けます。
  3. MQTT ホスト名を mqtt.googleapis.com または長期サポート ドメインに設定します(最小ルート CA セットを使用した場合)。
  4. ユーザー名を指定します。MQTT ブリッジはユーザー名フィールドを無視しますが、一部の MQTT クライアント ライブラリは、username フィールドを指定しない限りパスワード フィールドを送信しません。最良の結果を得るには、unusedignored などの任意のユーザー名を指定します。
  5. パスワードを設定する。パスワード フィールドには JWT を含める必要があります。

次のサンプルは、デバイスを認証するように MQTT クライアントを構成する方法を示しています。

C++

クライアント ID の構成とデバイスの認証の手順は、以下のハイライト表示のとおりです。
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java


// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
    String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);

// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
    String.format(
        "projects/%s/locations/%s/registries/%s/devices/%s",
        projectId, cloudRegion, registryId, gatewayId);

MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);

// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");

if ("RS256".equals(algorithm)) {
  connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
  connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
  throw new IllegalArgumentException(
      "Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}

System.out.println(String.format("%s", mqttClientId));

// Create a client, and connect to the Google MQTT bridge.
try (MqttClient client =
    new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence())) {
  // Both connect and publish operations may fail. If they do, allow retries but with an
  // exponential backoff time period.
  long initialConnectIntervalMillis = 500L;
  long maxConnectIntervalMillis = 6000L;
  long maxConnectRetryTimeElapsedMillis = 900000L;
  float intervalMultiplier = 1.5f;

  long retryIntervalMs = initialConnectIntervalMillis;
  long totalRetryTimeMs = 0;

  while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
    try {
      client.connect(connectOptions);
    } catch (MqttException e) {
      int reason = e.getReasonCode();

      // If the connection is lost or if the server cannot be connected, allow retries, but with
      // exponential backoff.
      System.out.println("An error occurred: " + e.getMessage());
      if (reason == MqttException.REASON_CODE_CONNECTION_LOST
          || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
        System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
        Thread.sleep(retryIntervalMs);
        totalRetryTimeMs += retryIntervalMs;
        retryIntervalMs *= intervalMultiplier;
        if (retryIntervalMs > maxConnectIntervalMillis) {
          retryIntervalMs = maxConnectIntervalMillis;
        }
      } else {
        throw e;
      }
    }
  }

  attachCallback(client, gatewayId);

  // The topic gateways receive error updates on. QoS must be 0.
  String errorTopic = String.format("/devices/%s/errors", gatewayId);
  System.out.println(String.format("Listening on %s", errorTopic));

  client.subscribe(errorTopic, 0);

  return client;

Node.js


// const deviceId = `myDevice`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const messageType = `events`;
// const numMessages = 5;

// The mqttClientId is a unique string that identifies this device. For Google
// Cloud IoT Core, it must be in the format below.
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`;

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

// Subscribe to the /devices/{device-id}/config topic to receive config updates.
// Config updates are recommended to use QoS 1 (at least once delivery)
client.subscribe(`/devices/${deviceId}/config`, {qos: 1});

// Subscribe to the /devices/{device-id}/commands/# topic to receive all
// commands or to the /devices/{device-id}/commands/<subfolder> to just receive
// messages published to a specific commands folder; we recommend you use
// QoS 0 (at most once delivery)
client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0});

// The MQTT topic that this device will publish data to. The MQTT topic name is
// required to be in the format below. The topic name must end in 'state' to
// publish state and 'events' to publish telemetry. Note that this is not the
// same as the device registry's Cloud Pub/Sub topic.
const mqttTopic = `/devices/${deviceId}/${messageType}`;

client.on('connect', success => {
  console.log('connect');
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs);
  }
});

client.on('close', () => {
  console.log('close');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  let messageStr = 'Message received: ';
  if (topic === `/devices/${deviceId}/config`) {
    messageStr = 'Config message received: ';
  } else if (topic.startsWith(`/devices/${deviceId}/commands`)) {
    messageStr = 'Command message received: ';
  }

  messageStr += Buffer.from(message, 'base64').toString('ascii');
  console.log(messageStr);
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

// Once all of the messages have been published, the connection to Google Cloud
// IoT will be closed and the process will exit. See the publishAsync method.

Python

このサンプルでは、Python 用の Google API クライアント ライブラリを使用します。
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return f"{rc}: {mqtt.error_string(rc)}"

def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print("on_connect", mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1

def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print("on_disconnect", error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True

def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print("on_publish")

def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode("utf-8"))
    print(
        "Received message '{}' on topic '{}' with Qos {}".format(
            payload, message.topic, str(message.qos)
        )
    )

def get_client(
    project_id,
    cloud_region,
    registry_id,
    device_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
        project_id, cloud_region, registry_id, device_id
    )
    print(f"Device client_id is '{client_id}'")

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username="unused", password=create_jwt(project_id, private_key_file, algorithm)
    )

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = f"/devices/{device_id}/config"

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = f"/devices/{device_id}/commands/#"

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print(f"Subscribing to {mqtt_command_topic}")
    client.subscribe(mqtt_command_topic, qos=0)

    return client

長期 MQTT ドメインを使用する

長期サポート(LTS)ドメインでは、1 つの TLS 構成を長期間使用できます。一度 MQTT クライアントを設定してから、LTS ドメインを介してメッセージをパブリッシュするように MQTT クライアントを構成し、サポートされている時間枠の間、MQTT ブリッジを介して継続的に通信を行うことができます。

現在アクティブな LTS ドメインは mqtt.2030.ltsapis.goog です。この LTS ドメインは 2030 年までサポートされます。

LTS ドメインを使用するには:

  1. LTS ドメインを介してメッセージをパブリッシュするように MQTT クライアントを構成します。

    1. Cloud IoT Core に対してデバイスの認証を行うように MQTT クライアントを構成します。
    2. デバイスを構成する際に、最小ルート CA セットのプライマリ証明書とバックアップ証明書を MQTT クライアントに関連付けます。
  2. mqtt.2030.ltsapis.goog を介してポート 8883 または 443 で TLS handshake を開始します。少なくとも次の TLS 機能を使用してください。

    • TLS 1.2
    • 証明書鍵とハッシュ アルゴリズムとして SHA-256 を使用する P-256
    • P-256 と暗号スイートの非圧縮ポイントを使用した TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256
    • Server Name Indication(SNI)
    • DNS over TCP または DNS over DNS

LTS ドメインに送信されるメッセージを含む、MQTT トラフィックの保護の詳細については、デバイスのセキュリティに関する推奨事項をご覧ください。

テレメトリー イベントをパブリッシュする

デバイスが MQTT クライアントを使用して構成され、MQTT ブリッジに接続されると、次の形式で PUBLISH メッセージを MQTT トピックに対して発行することで、テレメトリー イベントをパブリッシュできます。

/devices/DEVICE_ID/events

デバイス ID は、MQTT クライアント ID で指定されたデバイスの文字列 ID です。デバイス ID では大文字と小文字が区別されます。

この MQTT トピックにパブリッシュされたメッセージは、対応するレジストリのデフォルトのテレメトリー トピックに転送されます。デフォルトのテレメトリー トピックは、レジストリ リソースeventNotificationConfigs[i].pubsubTopicName フィールドで指定された Cloud Pub/Sub トピックです。デフォルトの Pub/Sub トピックが存在しない場合、公開されたテレメトリー データは失われます。他の Cloud Pub/Sub トピックにメッセージをパブリッシュするには、他の Cloud Pub/Sub トピックにテレメトリー イベントを公開するをご覧ください。

転送されたメッセージ データ フィールドには、デバイスによってパブリッシュされたメッセージのコピーが含まれ、Cloud Pub/Sub トピックの各メッセージに次のメッセージ属性が追加されます。

属性 説明
deviceId デバイス レジストリに対するユーザー定義の文字列 ID(thing1 など)。デバイス ID はレジストリ内で一意である必要があります。
deviceNumId サーバーが生成したデバイスの数値 ID。デバイスを作成すると、Cloud IoT Core によってデバイスの数値 ID が自動的に生成されます。グローバルに一意であり、編集することはできません。
deviceRegistryLocation デバイス レジストリの Google Cloud Platform リージョン(例: us-central1)。
deviceRegistryId デバイス レジストリに対するユーザー定義の文字列 ID(registry1 など)。
projectId レジストリとデバイスを所有する Cloud プロジェクトの文字列 ID。
subFolder このサブフォルダは、イベントのカテゴリや分類として使用できます。MQTT クライアントの場合、サブフォルダは DEVICE_ID/events の後にあるサブトピックで、直接コピーされます。たとえば、クライアントが MQTT トピック /devices/DEVICE_ID/events/alerts にパブリッシュする場合、サブフォルダは文字列 alerts です。

次のサンプルは、MQTT 接続を介して PUBLISH メッセージを送信する方法を示しています。

C++

このサンプルでは、Python 用の Google API クライアント ライブラリを使用します。
int Publish(char* payload, int payload_size) {
  int rc = -1;
  MQTTClient client = {0};
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token = {0};

  MQTTClient_create(&client, opts.address, opts.clientid,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 60;
  conn_opts.cleansession = 1;
  conn_opts.username = k_username;
  conn_opts.password = CreateJwt(opts.keypath, opts.projectid, opts.algorithm);
  MQTTClient_SSLOptions sslopts = MQTTClient_SSLOptions_initializer;

  sslopts.trustStore = opts.rootpath;
  sslopts.privateKey = opts.keypath;
  conn_opts.ssl = &sslopts;

  unsigned long retry_interval_ms = kInitialConnectIntervalMillis;
  unsigned long total_retry_time_ms = 0;
  while ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
    if (rc == 3) {  // connection refused: server unavailable
      usleep(retry_interval_ms * 1000);
      total_retry_time_ms += retry_interval_ms;
      if (total_retry_time_ms >= kMaxConnectRetryTimeElapsedMillis) {
        printf("Failed to connect, maximum retry time exceeded.");
        exit(EXIT_FAILURE);
      }
      retry_interval_ms *= kIntervalMultiplier;
      if (retry_interval_ms > kMaxConnectIntervalMillis) {
        retry_interval_ms = kMaxConnectIntervalMillis;
      }
    } else {
      printf("Failed to connect, return code %d\n", rc);
      exit(EXIT_FAILURE);
    }
  }

  pubmsg.payload = payload;
  pubmsg.payloadlen = payload_size;
  pubmsg.qos = kQos;
  pubmsg.retained = 0;
  MQTTClient_publishMessage(client, opts.topic, &pubmsg, &token);
  printf(
      "Waiting for up to %lu seconds for publication of %s\n"
      "on topic %s for client with ClientID: %s\n",
      (kTimeout / 1000), opts.payload, opts.topic, opts.clientid);
  rc = MQTTClient_waitForCompletion(client, token, kTimeout);
  printf("Message with delivery token %d delivered\n", token);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  return rc;
}

Java

// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;

long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;

while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
  try {
    client.connect(connectOptions);
  } catch (MqttException e) {
    int reason = e.getReasonCode();

    // If the connection is lost or if the server cannot be connected, allow retries, but with
    // exponential backoff.
    System.out.println("An error occurred: " + e.getMessage());
    if (reason == MqttException.REASON_CODE_CONNECTION_LOST
        || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
      System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
      Thread.sleep(retryIntervalMs);
      totalRetryTimeMs += retryIntervalMs;
      retryIntervalMs *= intervalMultiplier;
      if (retryIntervalMs > maxConnectIntervalMillis) {
        retryIntervalMs = maxConnectIntervalMillis;
      }
    } else {
      throw e;
    }
  }
}

attachCallback(client, options.deviceId);

// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;

// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);

// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
  String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
  System.out.format(
      "Publishing %s message %d/%d: '%s'%n",
      options.messageType, i, options.numMessages, payload);

  // Refresh the connection credentials before the JWT expires.
  long secsSinceRefresh = (Instant.now().toEpochMilli() - iat.toEpochMilli()) / 1000;
  if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
    iat = Instant.now();
    if ("RS256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
    } else if ("ES256".equals(options.algorithm)) {
      connectOptions.setPassword(
          createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
    } else {
      throw new IllegalArgumentException(
          "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
    }
    client.disconnect();
    client.connect(connectOptions);
    attachCallback(client, options.deviceId);
  }

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
  // also supports qos=0 for at most once delivery.
  MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
  message.setQos(1);
  client.publish(mqttTopic, message);

  if ("event".equals(options.messageType)) {
    // Send telemetry events every second
    Thread.sleep(1000);
  } else {
    // Note: Update Device state less frequently than with telemetry events
    Thread.sleep(5000);
  }
}

// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
  System.out.print('.');
  Thread.sleep(1000);
}
System.out.println("");

// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
  client.disconnect();
}

System.out.println("Finished loop successfully. Goodbye!");
client.close();

Node.js

const publishAsync = (
  mqttTopic,
  client,
  iatTime,
  messagesSent,
  numMessages,
  connectionArgs
) => {
  // If we have published enough messages or backed off too many times, stop.
  if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) {
    if (backoffTime >= MAXIMUM_BACKOFF_TIME) {
      console.log('Backoff time is too high. Giving up.');
    }
    console.log('Closing connection to MQTT. Goodbye!');
    client.end();
    publishChainInProgress = false;
    return;
  }

  // Publish and schedule the next publish.
  publishChainInProgress = true;
  let publishDelayMs = 0;
  if (shouldBackoff) {
    publishDelayMs = 1000 * (backoffTime + Math.random());
    backoffTime *= 2;
    console.log(`Backing off for ${publishDelayMs}ms before publishing.`);
  }

  setTimeout(() => {
    const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`;

    // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
    // Cloud IoT Core also supports qos=0 for at most once delivery.
    console.log('Publishing message:', payload);
    client.publish(mqttTopic, payload, {qos: 1}, err => {
      if (!err) {
        shouldBackoff = false;
        backoffTime = MINIMUM_BACKOFF_TIME;
      }
    });

    const schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000;
    setTimeout(() => {
      const secsFromIssue = parseInt(Date.now() / 1000) - iatTime;
      if (secsFromIssue > argv.tokenExpMins * 60) {
        iatTime = parseInt(Date.now() / 1000);
        console.log(`\tRefreshing token after ${secsFromIssue} seconds.`);

        client.end();
        connectionArgs.password = createJwt(
          argv.projectId,
          argv.privateKeyFile,
          argv.algorithm
        );
        connectionArgs.protocolId = 'MQTT';
        connectionArgs.protocolVersion = 4;
        connectionArgs.clean = true;
        client = mqtt.connect(connectionArgs);

        client.on('connect', success => {
          console.log('connect');
          if (!success) {
            console.log('Client not connected...');
          } else if (!publishChainInProgress) {
            publishAsync(
              mqttTopic,
              client,
              iatTime,
              messagesSent,
              numMessages,
              connectionArgs
            );
          }
        });

        client.on('close', () => {
          console.log('close');
          shouldBackoff = true;
        });

        client.on('error', err => {
          console.log('error', err);
        });

        client.on('message', (topic, message) => {
          console.log(
            'message received: ',
            Buffer.from(message, 'base64').toString('ascii')
          );
        });

        client.on('packetsend', () => {
          // Note: logging packet send is very verbose
        });
      }
      publishAsync(
        mqttTopic,
        client,
        iatTime,
        messagesSent + 1,
        numMessages,
        connectionArgs
      );
    }, schedulePublishDelayMs);
  }, publishDelayMs);
};

Python

このサンプルでは、Python 用の Google API クライアント ライブラリを使用します。
global minimum_backoff_time
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = "events" if args.message_type == "event" else "state"

mqtt_topic = f"/devices/{args.device_id}/{sub_topic}"

jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
    args.project_id,
    args.cloud_region,
    args.registry_id,
    args.device_id,
    args.private_key_file,
    args.algorithm,
    args.ca_certs,
    args.mqtt_bridge_hostname,
    args.mqtt_bridge_port,
)

# Publish num_messages messages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
    # Process network events.
    client.loop()

    # Wait if backoff is required.
    if should_backoff:
        # If backoff time is too large, give up.
        if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
            print("Exceeded maximum backoff time. Giving up.")
            break

        # Otherwise, wait and connect again.
        delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
        print(f"Waiting for {delay} before reconnecting.")
        time.sleep(delay)
        minimum_backoff_time *= 2
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    payload = f"{args.registry_id}/{args.device_id}-payload-{i}"
    print(f"Publishing message {i}/{args.num_messages}: '{payload}'")
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
        print(f"Refreshing token after {seconds_since_issue}s")
        jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
        client.loop()
        client.disconnect()
        client = get_client(
            args.project_id,
            args.cloud_region,
            args.registry_id,
            args.device_id,
            args.private_key_file,
            args.algorithm,
            args.ca_certs,
            args.mqtt_bridge_hostname,
            args.mqtt_bridge_port,
        )
    # Publish "payload" to the MQTT topic. qos=1 means at least once
    # delivery. Cloud IoT Core also supports qos=0 for at most once
    # delivery.
    client.publish(mqtt_topic, payload, qos=1)

    # Send events every second. State should not be updated as often
    for i in range(0, 60):
        time.sleep(1)
        client.loop()

追加の Cloud Pub/Sub トピックにテレメトリー イベントをパブリッシュする

デバイスは、追加の Cloud Pub/Sub トピックにデータをパブリッシュできます。デフォルトでは、/devices/DEVICE_ID/events にパブリッシュされた MQTT メッセージは、対応するレジストリのデフォルトのテレメトリー トピックに転送されます。MQTT トピックにサブフォルダを指定することで、データを追加の Cloud Pub/Sub トピックに転送できます。このサブフォルダは、/devices/DEVICE_ID/events の後のサブトピックです。

サブフォルダにパブリッシュされたメッセージは、同じ名前の Cloud Pub/Sub トピックに転送されます。対応するレジストリは、Cloud Pub/Sub トピックが構成されている必要があります。構成されていない場合、メッセージはデフォルトの Cloud Pub/Sub トピックに転送されます。

次の場合、メッセージは追加の Cloud Pub/Sub トピックではなく、デフォルトの Cloud Pub/Sub トピックに転送されます。

  • MQTT トピックでサブフォルダが指定されていない
  • サブフォルダが MQTT トピックで指定されているが、デバイス レジストリに一致する Pub/Sub トピックがない

たとえば、デバイスが MQTT トピック /devices/DEVICE_ID/events/alerts にパブリッシュする場合、サブフォルダは文字列 alerts です。eventNotificationConfigs[i].subfolderMatches フィールドと eventNotificationConfigs[i].pubsubTopicName フィールドの両方が alerts に設定されている場合、メッセージは追加の Cloud Pub/Sub トピックに転送されます。設定されていない場合、メッセージはデフォルトの Cloud Pub/Sub トピックに転送されます。

デバイスの状態を設定する

接続されたデバイスは、次の MQTT トピックに PUBLISH メッセージを発行してデバイスの状態を報告できます。

/devices/DEVICE_ID/state

状態メッセージを分類して取得するには、デバイスの状態に関するトピックを使用してレジストリを構成します。デバイス状態のトピックは、StateNotificationConfig.pubsubTopicName フィールドで指定された Cloud Pub/Sub トピックです。レジストリがデバイス状態のトピックで構成されている場合、これらのメッセージは、一致する Cloud Pub/Sub トピックにベスト エフォート方式で転送されます。

状態メッセージの取得について詳しくは、デバイスの状態の取得をご覧ください。

MQTT トラフィックを制限する

Cloud IoT Core によって、過剰な負荷を発生させるプロジェクトが制限されます。デバイスが失敗したオペレーションを待機することなく再試行すると、同じ Google Cloud プロジェクト内のすべてのデバイスに影響する上限をトリガーできます。

再試行する場合は、導入されたジッターを伴う切り捨て型指数バックオフ アルゴリズムを実装することを強くおすすめします。

キープアライブ

クライアントから最初の MQTT CONNECT メッセージを送信する場合は、オプションの「keep-alive」値を指定できます。この値は、PUBLISH メッセージなどのメッセージを送信することをブローカーがクライアントに期待する時間間隔(秒単位で測定)です。その間隔の間に、クライアントからブローカーにメッセージが送信されない場合、ブローカーは自動的に接続を終了します。指定した keep-alive 値は 1.5 で乗算されるため、10 分の keep-alive は実際には 15 分間隔になります。

詳しくは、MQTT 仕様をご覧ください。

クライアントの設定

Cloud IoT Core には、独自のデフォルトの keep-alive 値はありません。keep-alive の間隔を指定する場合は、クライアントで設定する必要があります。

最適な結果を得るには、クライアントの keep-alive 間隔を 60 秒以上に設定してください。CPythonNode.jsJava の Paho MQTT ライブラリなど、多くのオープンソースのクライアント ライブラリでは、60 秒がデフォルトで使用されます。

アイドル時間の上限

keep-alive 間隔とは別に、Cloud IoT Core には独自の 20 分間のアイドル時間上限があります。この上限に基づき、クライアントが 20 分間メッセージを送信しない場合、keep-alive 間隔がより長く設定されていても、クライアント接続は自動的に終了します。keep-alive 値が指定されていない場合でも、デフォルトのアイドル タイムアウトである 20 分は有効です。

トラブルシューティング

接続に問題がある場合は、トラブルシューティングをご覧ください。