连接到 MQTT 服务器。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Java
如需了解详情,请参阅 Cloud IoT Core Java API 参考文档。
如需向 Cloud IoT Core 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);
// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
String.format(
"projects/%s/locations/%s/registries/%s/devices/%s",
projectId, cloudRegion, registryId, gatewayId);
MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);
// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");
if ("RS256".equals(algorithm)) {
connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
throw new IllegalArgumentException(
"Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}
System.out.println(String.format("%s", mqttClientId));
// Create a client, and connect to the Google MQTT bridge.
try (MqttClient client =
new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence())) {
// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;
long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;
while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
try {
client.connect(connectOptions);
} catch (MqttException e) {
int reason = e.getReasonCode();
// If the connection is lost or if the server cannot be connected, allow retries, but with
// exponential backoff.
System.out.println("An error occurred: " + e.getMessage());
if (reason == MqttException.REASON_CODE_CONNECTION_LOST
|| reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
Thread.sleep(retryIntervalMs);
totalRetryTimeMs += retryIntervalMs;
retryIntervalMs *= intervalMultiplier;
if (retryIntervalMs > maxConnectIntervalMillis) {
retryIntervalMs = maxConnectIntervalMillis;
}
} else {
throw e;
}
}
}
attachCallback(client, gatewayId);
// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));
client.subscribe(errorTopic, 0);
return client;
后续步骤
如需搜索并过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。