static MqttCallback mCallback;
static long MINUTES_PER_HOUR = 60;
/** Create a Cloud IoT Core JWT for the given project id, signed with the given RSA key. */
private static String createJwtRsa(String projectId, String privateKeyFile)
throws NoSuchAlgorithmException, IOException, InvalidKeySpecException {
Instant now = Instant.now();
// Create a JWT to authenticate this device. The device will be disconnected after the token
// expires, and will have to reconnect with a new token. The audience field should always be set
// to the GCP project id.
JwtBuilder jwtBuilder =
Jwts.builder()
.setIssuedAt(Date.from(now))
.setExpiration(Date.from(now.plusSeconds(20 * 60)))
.setAudience(projectId);
byte[] keyBytes = Files.readAllBytes(Paths.get(privateKeyFile));
PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(keyBytes);
KeyFactory kf = KeyFactory.getInstance("RSA");
return jwtBuilder.signWith(SignatureAlgorithm.RS256, kf.generatePrivate(spec)).compact();
}
/** Create a Cloud IoT Core JWT for the given project id, signed with the given ES key. */
private static String createJwtEs(String projectId, String privateKeyFile)
throws NoSuchAlgorithmException, IOException, InvalidKeySpecException {
Instant now = Instant.now();
// Create a JWT to authenticate this device. The device will be disconnected after the token
// expires, and will have to reconnect with a new token. The audience field should always be set
// to the GCP project id.
JwtBuilder jwtBuilder =
Jwts.builder()
.setIssuedAt(Date.from(now))
.setExpiration(Date.from(now.plusSeconds(20 * 60)))
.setAudience(projectId);
byte[] keyBytes = Files.readAllBytes(Paths.get(privateKeyFile));
PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(keyBytes);
KeyFactory kf = KeyFactory.getInstance("EC");
return jwtBuilder.signWith(SignatureAlgorithm.ES256, kf.generatePrivate(spec)).compact();
}
/** Connects the gateway to the MQTT bridge. */
protected static MqttClient startMqtt(
String mqttBridgeHostname,
int mqttBridgePort,
String projectId,
String cloudRegion,
String registryId,
String gatewayId,
String privateKeyFile,
String algorithm)
throws NoSuchAlgorithmException, IOException, MqttException, InterruptedException,
InvalidKeySpecException {
// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
String.format("ssl://%s:%s", mqttBridgeHostname, mqttBridgePort);
// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
String.format(
"projects/%s/locations/%s/registries/%s/devices/%s",
projectId, cloudRegion, registryId, gatewayId);
MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);
// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");
if ("RS256".equals(algorithm)) {
connectOptions.setPassword(createJwtRsa(projectId, privateKeyFile).toCharArray());
} else if ("ES256".equals(algorithm)) {
connectOptions.setPassword(createJwtEs(projectId, privateKeyFile).toCharArray());
} else {
throw new IllegalArgumentException(
"Invalid algorithm " + algorithm + ". Should be one of 'RS256' or 'ES256'.");
}
System.out.println(String.format("%s", mqttClientId));
// Create a client, and connect to the Google MQTT bridge.
try (MqttClient client =
new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence())) {
// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;
long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;
while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
try {
client.connect(connectOptions);
} catch (MqttException e) {
int reason = e.getReasonCode();
// If the connection is lost or if the server cannot be connected, allow retries, but with
// exponential backoff.
System.out.println("An error occurred: " + e.getMessage());
if (reason == MqttException.REASON_CODE_CONNECTION_LOST
|| reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
Thread.sleep(retryIntervalMs);
totalRetryTimeMs += retryIntervalMs;
retryIntervalMs *= intervalMultiplier;
if (retryIntervalMs > maxConnectIntervalMillis) {
retryIntervalMs = maxConnectIntervalMillis;
}
} else {
throw e;
}
}
}
attachCallback(client, gatewayId);
// The topic gateways receive error updates on. QoS must be 0.
String errorTopic = String.format("/devices/%s/errors", gatewayId);
System.out.println(String.format("Listening on %s", errorTopic));
client.subscribe(errorTopic, 0);
return client;
}
}
protected static void sendDataFromDevice(
MqttClient client, String deviceId, String messageType, String data)
throws MqttException, UnsupportedEncodingException {
if (!"events".equals(messageType) && !"state".equals(messageType)) {
System.err.println("Invalid message type, must ether be 'state' or events'");
return;
}
final String dataTopic = String.format("/devices/%s/%s", deviceId, messageType);
MqttMessage message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8.name()));
message.setQos(1);
client.publish(dataTopic, message);
System.out.println("Data sent");
}
/** Sends data on behalf of a bound device using the Gateway. */
protected static void sendDataFromBoundDevice(
String mqttBridgeHostname,
short mqttBridgePort,
String projectId,
String cloudRegion,
String registryName,
String gatewayId,
String privateKeyFile,
String algorithm,
String deviceId,
String messageType,
String telemetryData)
throws MqttException, IOException, InvalidKeySpecException, InterruptedException,
NoSuchAlgorithmException {
MqttClient client =
startMqtt(
mqttBridgeHostname,
mqttBridgePort,
projectId,
cloudRegion,
registryName,
gatewayId,
privateKeyFile,
algorithm);
attachDeviceToGateway(client, deviceId);
sendDataFromDevice(client, deviceId, messageType, telemetryData);
detachDeviceFromGateway(client, deviceId);
}
protected static void listenForConfigMessages(
String mqttBridgeHostname,
short mqttBridgePort,
String projectId,
String cloudRegion,
String registryName,
String gatewayId,
String privateKeyFile,
String algorithm,
String deviceId)
throws MqttException, IOException, InvalidKeySpecException, InterruptedException,
NoSuchAlgorithmException {
// Connect the Gateway
MqttClient client =
startMqtt(
mqttBridgeHostname,
mqttBridgePort,
projectId,
cloudRegion,
registryName,
gatewayId,
privateKeyFile,
algorithm);
// Connect the bound device and listen for configuration messages.
attachDeviceToGateway(client, deviceId);
attachCallback(client, deviceId);
detachDeviceFromGateway(client, deviceId);
}
protected static void attachDeviceToGateway(MqttClient client, String deviceId)
throws MqttException, UnsupportedEncodingException {
final String attachTopic = String.format("/devices/%s/attach", deviceId);
System.out.println(String.format("Attaching: %s", attachTopic));
String attachPayload = "{}";
MqttMessage message = new MqttMessage(attachPayload.getBytes(StandardCharsets.UTF_8.name()));
message.setQos(1);
client.publish(attachTopic, message);
}
/** Detaches a bound device from the Gateway. */
protected static void detachDeviceFromGateway(MqttClient client, String deviceId)
throws MqttException, UnsupportedEncodingException {
final String detachTopic = String.format("/devices/%s/detach", deviceId);
System.out.println(String.format("Detaching: %s", detachTopic));
String attachPayload = "{}";
MqttMessage message = new MqttMessage(attachPayload.getBytes(StandardCharsets.UTF_8.name()));
message.setQos(1);
client.publish(detachTopic, message);
}
protected static void mqttDeviceDemo(MqttExampleOptions options)
throws NoSuchAlgorithmException, IOException, InvalidKeySpecException, MqttException,
InterruptedException {
// Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
// connections are accepted. For server authentication, the JVM's root certificates
// are used.
final String mqttServerAddress =
String.format("ssl://%s:%s", options.mqttBridgeHostname, options.mqttBridgePort);
// Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
// Google Cloud IoT Core, it must be in the format below.
final String mqttClientId =
String.format(
"projects/%s/locations/%s/registries/%s/devices/%s",
options.projectId, options.cloudRegion, options.registryId, options.deviceId);
MqttConnectOptions connectOptions = new MqttConnectOptions();
// Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
// explictly set this. If you don't set MQTT version, the server will immediately close its
// connection to your device.
connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
Properties sslProps = new Properties();
sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
connectOptions.setSSLProperties(sslProps);
// With Google Cloud IoT Core, the username field is ignored, however it must be set for the
// Paho client library to send the password field. The password field is used to transmit a JWT
// to authorize the device.
connectOptions.setUserName("unused");
Instant iat = Instant.now();
if ("RS256".equals(options.algorithm)) {
connectOptions.setPassword(
createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
} else if ("ES256".equals(options.algorithm)) {
connectOptions.setPassword(
createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
} else {
throw new IllegalArgumentException(
"Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
}
// Create a client, and connect to the Google MQTT bridge.
MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());
// Both connect and publish operations may fail. If they do, allow retries but with an
// exponential backoff time period.
long initialConnectIntervalMillis = 500L;
long maxConnectIntervalMillis = 6000L;
long maxConnectRetryTimeElapsedMillis = 900000L;
float intervalMultiplier = 1.5f;
long retryIntervalMs = initialConnectIntervalMillis;
long totalRetryTimeMs = 0;
while (totalRetryTimeMs < maxConnectRetryTimeElapsedMillis && !client.isConnected()) {
try {
client.connect(connectOptions);
} catch (MqttException e) {
int reason = e.getReasonCode();
// If the connection is lost or if the server cannot be connected, allow retries, but with
// exponential backoff.
System.out.println("An error occurred: " + e.getMessage());
if (reason == MqttException.REASON_CODE_CONNECTION_LOST
|| reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
Thread.sleep(retryIntervalMs);
totalRetryTimeMs += retryIntervalMs;
retryIntervalMs *= intervalMultiplier;
if (retryIntervalMs > maxConnectIntervalMillis) {
retryIntervalMs = maxConnectIntervalMillis;
}
} else {
throw e;
}
}
}
attachCallback(client, options.deviceId);
// Publish to the events or state topic based on the flag.
String subTopic = "event".equals(options.messageType) ? "events" : options.messageType;
// The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
// required to be in the format below. Note that this is not the same as the device registry's
// Cloud Pub/Sub topic.
String mqttTopic = String.format("/devices/%s/%s", options.deviceId, subTopic);
// Publish numMessages messages to the MQTT bridge, at a rate of 1 per second.
for (int i = 1; i <= options.numMessages; ++i) {
String payload = String.format("%s/%s-payload-%d", options.registryId, options.deviceId, i);
System.out.format(
"Publishing %s message %d/%d: '%s'%n",
options.messageType, i, options.numMessages, payload);
// Refresh the connection credentials before the JWT expires.
long secsSinceRefresh = (Instant.now().toEpochMilli() - iat.toEpochMilli()) / 1000;
if (secsSinceRefresh > (options.tokenExpMins * MINUTES_PER_HOUR)) {
System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
iat = Instant.now();
if ("RS256".equals(options.algorithm)) {
connectOptions.setPassword(
createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
} else if ("ES256".equals(options.algorithm)) {
connectOptions.setPassword(
createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
} else {
throw new IllegalArgumentException(
"Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
}
client.disconnect();
client.connect(connectOptions);
attachCallback(client, options.deviceId);
}
// Publish "payload" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
// also supports qos=0 for at most once delivery.
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8.name()));
message.setQos(1);
client.publish(mqttTopic, message);
if ("event".equals(options.messageType)) {
// Send telemetry events every second
Thread.sleep(1000);
} else {
// Note: Update Device state less frequently than with telemetry events
Thread.sleep(5000);
}
}
// Wait for commands to arrive for about two minutes.
for (int i = 1; i <= options.waitTime; ++i) {
System.out.print('.');
Thread.sleep(1000);
}
System.out.println("");
// Disconnect the client if still connected, and finish the run.
if (client.isConnected()) {
client.disconnect();
}
System.out.println("Finished loop successfully. Goodbye!");
client.close();
}
/** Attaches the callback used when configuration changes occur. */
protected static void attachCallback(MqttClient client, String deviceId)
throws MqttException, UnsupportedEncodingException {
mCallback =
new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
// Do nothing...
}
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8.name());
System.out.println("Payload : " + payload);
// TODO: Insert your parsing / handling of the configuration message here.
//
} catch (UnsupportedEncodingException uee) {
System.err.println(uee);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Do nothing;
}
};
String commandTopic = String.format("/devices/%s/commands/#", deviceId);
System.out.println(String.format("Listening on %s", commandTopic));
String configTopic = String.format("/devices/%s/config", deviceId);
System.out.println(String.format("Listening on %s", configTopic));
client.subscribe(configTopic, 1);
client.subscribe(commandTopic, 1);
client.setCallback(mCallback);
}