Envía datos en nombre de un dispositivo vinculado.
Muestra de código
C#
public static object SendDataFromBoundDevice(string projectId, string cloudRegion,
string registryId, string deviceId, string gatewayId, string privateKeyFile,
string algorithm, string caCerts, string mqttBridgeHostname, int mqttBridgePort,
int jwtExpiresMinutes, string messageType, string payload)
{
// Publish device events and gateway state.
string device_topic = $"/devices/{deviceId}/state";
string gateway_topic = $"/devices/{gatewayId}/state";
var jwtIatTime = SystemClock.Instance.GetCurrentInstant();
Duration durationExp = Duration.FromMinutes(jwtExpiresMinutes);
var jwtExpTime = SystemClock.Instance.GetCurrentInstant().Plus(durationExp);
// Use gateway to connect to server.
var password = CreateJwtRsa(projectId, privateKeyFile);
var mqttClient = GetClient(projectId, cloudRegion,
registryId, gatewayId, caCerts,
mqttBridgeHostname, mqttBridgePort);
var clientId = $"projects/{projectId}/locations/{cloudRegion}/registries/{registryId}" +
$"/devices/{gatewayId}";
mqttClient.Connect(clientId, "unused", password);
SetupMqttTopics(mqttClient, gatewayId);
AttachDevice(mqttClient, deviceId, "{}");
System.Threading.Thread.Sleep(1000);
// Publish numMsgs messages to the MQTT bridge.
SendDataFromDevice(mqttClient, deviceId, messageType, payload);
DetachDevice(mqttClient, deviceId);
mqttClient.Disconnect();
return 0;
}
Go
import (
"errors"
"fmt"
"io"
"math/rand"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// sendDataFromBoundDevice starts a gateway client that sends data on behalf of a bound device.
func sendDataFromBoundDevice(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, numMessages int, payload string) error {
const (
mqttBrokerURL = "tls://mqtt.googleapis.com:8883"
protocolVersion = 4 // corresponds to MQTT 3.1.1
minimumBackoffTime = 1 // initial backoff time in seconds
maximumBackoffTime = 32 // maximum backoff time in seconds
)
var backoffTime = minimumBackoffTime
var shouldBackoff = false
// onConnect defines the on connect handler which resets backoff variables.
var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
shouldBackoff = false
backoffTime = minimumBackoffTime
}
// onMessage defines the message handler for the mqtt client.
var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
fmt.Fprintf(w, "Message: %s\n", msg.Payload())
}
// onDisconnect defines the connection lost handler for the mqtt client.
var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Fprintln(w, "Client disconnected")
shouldBackoff = true
}
jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)
opts := mqtt.NewClientOptions()
opts.AddBroker(mqttBrokerURL)
opts.SetClientID(clientID)
opts.SetUsername("unused")
opts.SetPassword(jwt)
opts.SetProtocolVersion(protocolVersion)
opts.SetOnConnectHandler(onConnect)
opts.SetDefaultPublishHandler(onMessage)
opts.SetConnectionLostHandler(onDisconnect)
// Create and connect a client using the above options.
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, "Failed to connect client")
return token.Error()
}
if err := attachDevice(deviceID, client, ""); err != nil {
fmt.Fprintf(w, "Failed to attach device %s\n", err)
return err
}
// Sleep for 5 seconds to allow attachDevice message to propagate.
time.Sleep(5 * time.Second)
gatewayStateTopic := fmt.Sprintf("/devices/%s/state", gatewayID)
deviceStateTopic := fmt.Sprintf("/devices/%s/state", deviceID)
gatewayInitPayload := fmt.Sprintf("Starting gateway at time: %d", time.Now().Unix())
if token := client.Publish(gatewayStateTopic, 1, false, gatewayInitPayload); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, "Failed to publish initial gateway payload")
return token.Error()
}
for i := 1; i <= numMessages; i++ {
if shouldBackoff {
if backoffTime > maximumBackoffTime {
fmt.Fprintln(w, "Exceeded max backoff time.")
return errors.New("exceeded maximum backoff time, exiting")
}
waitTime := backoffTime + rand.Intn(1000)/1000.0
time.Sleep(time.Duration(waitTime) * time.Second)
backoffTime *= 2
client = mqtt.NewClient(opts)
}
deviceStatePayload := fmt.Sprintf("%s, #%d", payload, i)
fmt.Fprintf(w, "Publishing message: %s to %s\n", deviceStatePayload, deviceStateTopic)
if token := client.Publish(deviceStateTopic, 1, false, payload); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, "Failed to publish payload to device state topic")
return token.Error()
}
// Sleep for a bit between messages to simulate real world device state publishing.
time.Sleep(5 * time.Second)
}
detachDevice(deviceID, client, "")
client.Disconnect(20)
return nil
}
Node.js
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const numMessages = 5;
// const tokenExpMins = 60;
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(`MQTT client id: ${mqttClientId}`);
const connectionArgs = {
host: mqttBridgeHostname,
port: mqttBridgePort,
clientId: mqttClientId,
username: 'unused',
password: createJwt(projectId, privateKeyFile, algorithm),
protocol: 'mqtts',
qos: 1,
secureProtocol: 'TLSv1_2_method',
ca: [readFileSync(serverCertFile)],
};
// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);
client.on('connect', success => {
if (!success) {
console.log('Client not connected...');
} else if (!publishChainInProgress) {
console.log('Client connected: Attaching device');
attachDevice(deviceId, client);
setTimeout(() => {
console.log('Client connected: Gateway is ready to relay');
publishAsyncGateway(
client,
iatTime,
tokenExpMins,
0,
numMessages,
registryId,
deviceId,
gatewayId,
connectionArgs,
projectId,
privateKeyFile,
algorithm
);
}, 5000);
}
});
client.on('close', () => {
console.log('Connection closed');
shouldBackoff = true;
});
client.on('error', err => {
console.log('error', err);
});
client.on('message', (topic, message) => {
console.log(
'message received: ',
Buffer.from(message, 'base64').toString('ascii')
);
});
client.on('packetsend', () => {
// Note: logging packet send is very verbose
});
¿Qué sigue?
Para buscar y filtrar muestras de código en otros productos de Google Cloud, consulta el navegador de muestra de Google Cloud.