バインドされたデバイスに代わってデータを送信します。
コードサンプル
Go
詳細については、Cloud IoT Core Go の API のリファレンス ドキュメントをご覧ください。
Cloud IoT Core 認証するには、アプリケーションのデフォルト認証情報を設定します。 詳細については、ローカル開発環境の認証の設定をご覧ください。
import (
"errors"
"fmt"
"io"
"math/rand"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// sendDataFromBoundDevice starts a gateway client that sends data on behalf of a bound device.
func sendDataFromBoundDevice(w io.Writer, projectID string, region string, registryID string, gatewayID string, deviceID string, privateKeyPath string, algorithm string, numMessages int, payload string) error {
const (
mqttBrokerURL = "tls://mqtt.googleapis.com:8883"
protocolVersion = 4 // corresponds to MQTT 3.1.1
minimumBackoffTime = 1 // initial backoff time in seconds
maximumBackoffTime = 32 // maximum backoff time in seconds
)
var backoffTime = minimumBackoffTime
var shouldBackoff = false
// onConnect defines the on connect handler which resets backoff variables.
var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Fprintf(w, "Client connected: %t\n", client.IsConnected())
shouldBackoff = false
backoffTime = minimumBackoffTime
}
// onMessage defines the message handler for the mqtt client.
var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Fprintf(w, "Topic: %s\n", msg.Topic())
fmt.Fprintf(w, "Message: %s\n", msg.Payload())
}
// onDisconnect defines the connection lost handler for the mqtt client.
var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Fprintln(w, "Client disconnected")
shouldBackoff = true
}
jwt, _ := createJWT(projectID, privateKeyPath, algorithm, 60)
clientID := fmt.Sprintf("projects/%s/locations/%s/registries/%s/devices/%s", projectID, region, registryID, gatewayID)
opts := mqtt.NewClientOptions()
opts.AddBroker(mqttBrokerURL)
opts.SetClientID(clientID)
opts.SetUsername("unused")
opts.SetPassword(jwt)
opts.SetProtocolVersion(protocolVersion)
opts.SetOnConnectHandler(onConnect)
opts.SetDefaultPublishHandler(onMessage)
opts.SetConnectionLostHandler(onDisconnect)
// Create and connect a client using the above options.
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, "Failed to connect client")
return token.Error()
}
if err := attachDevice(deviceID, client, ""); err != nil {
fmt.Fprintf(w, "Failed to attach device %s\n", err)
return err
}
// Sleep for 5 seconds to allow attachDevice message to propagate.
time.Sleep(5 * time.Second)
gatewayStateTopic := fmt.Sprintf("/devices/%s/state", gatewayID)
deviceStateTopic := fmt.Sprintf("/devices/%s/state", deviceID)
gatewayInitPayload := fmt.Sprintf("Starting gateway at time: %d", time.Now().Unix())
if token := client.Publish(gatewayStateTopic, 1, false, gatewayInitPayload); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, "Failed to publish initial gateway payload")
return token.Error()
}
for i := 1; i <= numMessages; i++ {
if shouldBackoff {
if backoffTime > maximumBackoffTime {
fmt.Fprintln(w, "Exceeded max backoff time.")
return errors.New("exceeded maximum backoff time, exiting")
}
waitTime := backoffTime + rand.Intn(1000)/1000.0
time.Sleep(time.Duration(waitTime) * time.Second)
backoffTime *= 2
client = mqtt.NewClient(opts)
}
deviceStatePayload := fmt.Sprintf("%s, #%d", payload, i)
fmt.Fprintf(w, "Publishing message: %s to %s\n", deviceStatePayload, deviceStateTopic)
if token := client.Publish(deviceStateTopic, 1, false, payload); token.Wait() && token.Error() != nil {
fmt.Fprintln(w, "Failed to publish payload to device state topic")
return token.Error()
}
// Sleep for a bit between messages to simulate real world device state publishing.
time.Sleep(5 * time.Second)
}
detachDevice(deviceID, client, "")
client.Disconnect(20)
return nil
}
Node.js
詳細については、Cloud IoT Core Node.js の API のリファレンス ドキュメントをご覧ください。
Cloud IoT Core 認証するには、アプリケーションのデフォルト認証情報を設定します。 詳細については、ローカル開発環境の認証の設定をご覧ください。
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
// const mqttBridgeHostname = `mqtt.googleapis.com`;
// const mqttBridgePort = 8883;
// const numMessages = 5;
// const tokenExpMins = 60;
const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(`MQTT client id: ${mqttClientId}`);
const connectionArgs = {
host: mqttBridgeHostname,
port: mqttBridgePort,
clientId: mqttClientId,
username: 'unused',
password: createJwt(projectId, privateKeyFile, algorithm),
protocol: 'mqtts',
qos: 1,
secureProtocol: 'TLSv1_2_method',
ca: [readFileSync(serverCertFile)],
};
// Create a client, and connect to the Google MQTT bridge.
const iatTime = parseInt(Date.now() / 1000);
const client = mqtt.connect(connectionArgs);
client.on('connect', success => {
if (!success) {
console.log('Client not connected...');
} else if (!publishChainInProgress) {
console.log('Client connected: Attaching device');
attachDevice(deviceId, client);
setTimeout(() => {
console.log('Client connected: Gateway is ready to relay');
publishAsyncGateway(
client,
iatTime,
tokenExpMins,
0,
numMessages,
registryId,
deviceId,
gatewayId,
connectionArgs,
projectId,
privateKeyFile,
algorithm
);
}, 5000);
}
});
client.on('close', () => {
console.log('Connection closed');
shouldBackoff = true;
});
client.on('error', err => {
console.log('error', err);
});
client.on('message', (topic, message) => {
console.log(
'message received: ',
Buffer.from(message, 'base64').toString('ascii')
);
});
client.on('packetsend', () => {
// Note: logging packet send is very verbose
});
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。