Send data from a bound device

Send data on behalf of a bound device.

Code sample

C#

public static object SendDataFromBoundDevice(string projectId, string cloudRegion,
    string registryId, string deviceId, string gatewayId, string privateKeyFile,
    string algorithm, string caCerts, string mqttBridgeHostname, int mqttBridgePort,
    int jwtExpiresMinutes, string messageType, string payload)
{
    // Publish device events and gateway state.
    string device_topic = $"/devices/{deviceId}/state";
    string gateway_topic = $"/devices/{gatewayId}/state";

    var jwtIatTime = SystemClock.Instance.GetCurrentInstant();
    Duration durationExp = Duration.FromMinutes(jwtExpiresMinutes);
    var jwtExpTime = SystemClock.Instance.GetCurrentInstant().Plus(durationExp);

    // Use gateway to connect to server.
    var password = CreateJwtRsa(projectId, privateKeyFile);
    var mqttClient = GetClient(projectId, cloudRegion,
        registryId, gatewayId, caCerts,
       mqttBridgeHostname, mqttBridgePort);

    var clientId = $"projects/{projectId}/locations/{cloudRegion}/registries/{registryId}" +
        $"/devices/{gatewayId}";
    mqttClient.Connect(clientId, "unused", password);
    SetupMqttTopics(mqttClient, gatewayId);

    AttachDevice(mqttClient, deviceId, "{}");
    System.Threading.Thread.Sleep(1000);
    // Publish numMsgs messages to the MQTT bridge.
    SendDataFromDevice(mqttClient, deviceId, messageType, payload);

    DetachDevice(mqttClient, deviceId);
    mqttClient.Disconnect();
    return 0;
}

Go


import (
	"errors"
	"fmt"
	"io"
	"math/rand"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// sendDataFromBoundDevice starts a gateway client that sends data on behalf of a bound device.
func sendDataFromBoundDevice(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, numMessages int, payload string) error {
	const (
		mqttBrokerURL      = "tls://mqtt.googleapis.com:8883"
		protocolVersion    = 4  // corresponds to MQTT 3.1.1
		minimumBackoffTime = 1  // initial backoff time in seconds
		maximumBackoffTime = 32 // maximum backoff time in seconds
	)

	var backoffTime = minimumBackoffTime
	var shouldBackoff = false

	// onConnect defines the on connect handler which resets backoff variables.
	var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
		fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())

		shouldBackoff = false
		backoffTime = minimumBackoffTime
	}

	// onMessage defines the message handler for the mqtt client.
	var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
		fmt.Fprintf(w, "Message: %s\n", msg.Payload())
	}

	// onDisconnect defines the connection lost handler for the mqtt client.
	var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
		fmt.Fprintln(w, "Client disconnected")
		shouldBackoff = true
	}

	jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
	clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(mqttBrokerURL)
	opts.SetClientID(clientID)
	opts.SetUsername("unused")
	opts.SetPassword(jwt)
	opts.SetProtocolVersion(protocolVersion)
	opts.SetOnConnectHandler(onConnect)
	opts.SetDefaultPublishHandler(onMessage)
	opts.SetConnectionLostHandler(onDisconnect)

	// Create and connect a client using the above options.
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to connect client")
		return token.Error()
	}

	if err := attachDevice(deviceID, client, ""); err != nil {
		fmt.Fprintf(w, "Failed to attach device %s\n", err)
		return err
	}

	// Sleep for 5 seconds to allow attachDevice message to propagate.
	time.Sleep(5 * time.Second)

	gatewayStateTopic := fmt.Sprintf("/devices/%s/state", gatewayID)
	deviceStateTopic := fmt.Sprintf("/devices/%s/state", deviceID)

	gatewayInitPayload := fmt.Sprintf("Starting gateway at time: %d", time.Now().Unix())
	if token := client.Publish(gatewayStateTopic, 1, false, gatewayInitPayload); token.Wait() && token.Error() != nil {
		fmt.Fprintln(w, "Failed to publish initial gateway payload")
		return token.Error()
	}

	for i := 1; i <= numMessages; i++ {
		if shouldBackoff {
			if backoffTime > maximumBackoffTime {
				fmt.Fprintln(w, "Exceeded max backoff time.")
				return errors.New("exceeded maximum backoff time, exiting")
			}

			waitTime := backoffTime + rand.Intn(1000)/1000.0
			time.Sleep(time.Duration(waitTime) * time.Second)

			backoffTime *= 2
			client = mqtt.NewClient(opts)
		}

		deviceStatePayload := fmt.Sprintf("%s, #%d", payload, i)

		fmt.Fprintf(w, "Publishing message: %s to %s\n", deviceStatePayload, deviceStateTopic)

		if token := client.Publish(deviceStateTopic, 1, false, payload); token.Wait() && token.Error() != nil {
			fmt.Fprintln(w, "Failed to publish payload to device state topic")
			return token.Error()
		}

		// Sleep for a bit between messages to simulate real world device state publishing.
		time.Sleep(5 * time.Second)
	}

	detachDevice(deviceID, client, "")

	client.Disconnect(20)
	return nil
}

Node.js

// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const numMessages = 5;
// const tokenExpMins = 60;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(`MQTT client id: ${mqttClientId}`);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: 'mqtts',
  qos: 1,
  secureProtocol: 'TLSv1_2_method',
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);

client.on('connect', success => {
  if (!success) {
    console.log('Client not connected...');
  } else if (!publishChainInProgress) {
    console.log('Client connected: Attaching device');
    attachDevice(deviceId, client);
    setTimeout(() => {
      console.log('Client connected: Gateway is ready to relay');
      publishAsyncGateway(
        client,
        iatTime,
        tokenExpMins,
        0,
        numMessages,
        registryId,
        deviceId,
        gatewayId,
        connectionArgs,
        projectId,
        privateKeyFile,
        algorithm
      );
    }, 5000);
  }
});

client.on('close', () => {
  console.log('Connection closed');
  shouldBackoff = true;
});

client.on('error', err => {
  console.log('error', err);
});

client.on('message', (topic, message) => {
  console.log(
    'message received: ',
    Buffer.from(message, 'base64').toString('ascii')
  );
});

client.on('packetsend', () => {
  // Note: logging packet send is very verbose
});

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.