Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Le modèle MQTT vers Pub/Sub est un pipeline de streaming qui lit les messages d'un sujet MQTT et les écrit dans Pub/Sub.
Il inclut les paramètres facultatifs username et password au cas où l'authentification serait requise par le serveur MQTT.
Si le pipeline ne reçoit aucun message du sujet MQTT pendant plus de 90 minutes, une erreur StackOverflowError se produit.
Pour contourner ce problème, vous pouvez modifier le nombre de nœuds de calcul toutes les 90 minutes.
Pour en savoir plus sur la modification du nombre de nœuds de calcul sans arrêter votre job, consultez la section Mise à jour des options de job en cours.
Conditions requises pour ce pipeline
Le nom du sujet de sortie Pub/Sub doit déjà exister.
L'adresse IP de l'hôte MQTT doit exister et présenter la configuration réseau appropriée pour que les machines de nœud de calcul puissent atteindre l'hôte MQTT.
Le sujet MQTT à partir duquel les données sont extraites doit avoir un nom.
Paramètres de modèle
Paramètres obligatoires
inputTopic : Nom du sujet MQTT à partir duquel les données sont lues. (Exemple : sujet).
outputTopic : Nom du sujet Pub/Sub de sortie dans lequel les données sont écrites. (par exemple, projects/votre-id-projet/topics/nom-de-votre-sujet).
username : Nom d'utilisateur à utiliser pour l'authentification sur le serveur MQTT. (Exemple: exemplenomdutilisateur).
password : Mot de passe associé au nom d'utilisateur fourni. (Exemple: exemplemotdepasse).
Paramètres facultatifs
brokerServer : Adresse IP ou hôte du serveur de courtage MQTT. (par exemple, tcp://host:1883).
Exécuter le modèle
Console
Accédez à la page Dataflow Créer un job à partir d'un modèle.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez par le nom de la région Dataflow. Exemple : us-central1.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez INPUT_TOPIC par le nom du sujet d'entrée sur le serveur MQTT. Exemple : testtopic.
Remplacez MQTT_SERVER par les adresses du serveur MQTT. Par exemple : tcp://10.128.0.62:1883
Remplacez OUTPUT_TOPIC par le nom du sujet de sortie Pub/Sub. Exemple : projects/myproject/topics/testoutput.
Remplacez USERNAME par le nom d'utilisateur pour le serveur MQTT. Exemple : testuser.
Remplacez PASSWORD par le mot de passe correspondant au nom d'utilisateur associé au serveur MQTT.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez par le nom de la région Dataflow. Exemple : us-central1.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez INPUT_TOPIC par le nom du sujet d'entrée sur le serveur MQTT. Exemple : testtopic.
Remplacez MQTT_SERVER par les adresses du serveur MQTT. Par exemple : tcp://10.128.0.62:1883
Remplacez OUTPUT_TOPIC par le nom du sujet de sortie Pub/Sub. Exemple : projects/myproject/topics/testoutput.
Remplacez USERNAME par le nom d'utilisateur pour le serveur MQTT. Exemple : testuser.
Remplacez PASSWORD par le mot de passe correspondant au nom d'utilisateur associé au serveur MQTT.
Code source du modèle
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
/**
* Dataflow template which reads data from Mqtt Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mqtt-to-pubsub/README_Mqtt_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Mqtt_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "MQTT to Pubsub",
description =
"The MQTT to Pub/Sub template is a streaming pipeline that reads messages from an MQTT topic and writes them to Pub/Sub. "
+ "It includes the optional parameters <code>username</code> and <code>password</code> in case authentication is required by the MQTT server.",
optionsClass = MqttToPubsub.MqttToPubsubOptions.class,
flexContainerName = "mqtt-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/mqtt-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The MQTT host IP must exist and have the proper network configuration for worker machines to reach the MQTT host.",
"The MQTT topic that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class MqttToPubsub {
/**
* Runs a pipeline which reads data from Mqtt topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
MqttToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(MqttToPubsubOptions.class);
run(options);
}
public static void validate(MqttToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null && !options.getUsername().isEmpty())
&& (options.getPassword() == null || options.getPassword().isBlank())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(MqttToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
MqttIO.Read mqttIo;
if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
mqttIo =
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
options.getBrokerServer(), options.getInputTopic())
.withUsername(options.getUsername())
.withPassword(options.getPassword()));
} else {
mqttIo =
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
options.getBrokerServer(), options.getInputTopic()));
}
return pipeline
.apply("ReadFromMqttTopic", mqttIo)
.apply(ParDo.of(new ByteToStringTransform()))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()))
.getPipeline()
.run();
}
static class ByteToStringTransform extends DoFn<byte[], String> {
@ProcessElement
public void processElement(@Element byte[] word, OutputReceiver<String> out) {
out.output(new String(word, StandardCharsets.UTF_8));
}
}
/**
* The {@link MqttToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface MqttToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "MQTT Broker IP",
helpText = "The MQTT broker server IP or host.",
example = "tcp://host:1883")
@Validation.Required
String getBrokerServer();
void setBrokerServer(String brokerServer);
@TemplateParameter.Text(
order = 2,
optional = false,
regexes = {"[\\/a-zA-Z0-9._-]+"},
description = "MQTT topic(s) to read the input from",
helpText = "The name of the MQTT topic that data is read from.",
example = "topic")
@Validation.Required
String getInputTopic();
void setInputTopic(String inputTopics);
@TemplateParameter.PubsubTopic(
order = 3,
description = "Output Pub/Sub topic",
helpText = "The name of the output Pub/Sub topic that data is written to.",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 4,
description = "MQTT Username",
helpText = "The username to use for authentication on the MQTT server.",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Password(
order = 5,
description = "MQTT Password",
helpText = "The password associated with the provided username.",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
Dernière mise à jour le 2024/06/26 (UTC).
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"Problème de traduction"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Autre"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Facile à comprendre"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"J'ai pu résoudre mon problème"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Autre"
}]