Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
Il modello da MQTT a Pub/Sub è una pipeline di inserimento flussi che legge i messaggi da un argomento MQTT e li scrive in Pub/Sub.
Include i parametri facoltativi username e password nel caso in cui il server MQTT richieda l'autenticazione.
Se la pipeline non riceve nessun messaggio dall'argomento MQTT per più di 90 minuti, si verifica un StackOverflowError.
Come soluzione alternativa, puoi modificare il numero di worker ogni 90 minuti.
Per ulteriori informazioni sulla modifica del numero di worker senza interrompere il job,
consulta Aggiornamento delle opzioni del job in corso.
Requisiti della pipeline
Deve esistere il nome dell'argomento di output Pub/Sub.
L'IP dell'host MQTT deve esistere e avere la configurazione di rete adeguata per consentire alle macchine worker di raggiungere l'host MQTT.
L'argomento MQTT da cui vengono estratti i dati deve avere un nome.
Parametri del modello
Parametro
Descrizione
brokerServer
L'indirizzo IP o l'host del server del broker MQTT. Ad esempio, tcp://10.0.0.1:1883.
inputTopic
Il nome dell'argomento MQTT da cui vengono letti i dati.
outputTopic
Il nome dell'argomento Pub/Sub di output in cui vengono scritti i dati.
username
(Facoltativo) Il nome utente da utilizzare per l'autenticazione sul server MQTT.
password
(Facoltativo) La password associata al nome utente fornito.
In questo esempio devi sostituire i valori seguenti:
Sostituisci YOUR_PROJECT_ID con l'ID progetto.
Sostituisci con il nome della regione Dataflow. Ad esempio: us-central1.
Sostituisci JOB_NAME con un nome job a tua scelta. Per essere valido, il nome del job deve corrispondere all'espressione regolare
[a-z]([-a-z0-9]{0,38}[a-z0-9])?.
Sostituisci INPUT_TOPIC con il nome dell'argomento di input del server MQTT. Ad esempio: testtopic.
Sostituisci MQTT_SERVER con gli indirizzi del server MQTT. Ad esempio: tcp://10.128.0.62:1883
Sostituisci OUTPUT_TOPIC con il nome dell'argomento di output Pub/Sub. Ad esempio: projects/myproject/topics/testoutput.
Sostituisci USERNAME con il nome utente del server MQTT. Ad esempio: testuser.
Sostituisci PASSWORD con la password che corrisponde al nome utente utilizzato con il server MQTT.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.
In questo esempio devi sostituire i valori seguenti:
Sostituisci YOUR_PROJECT_ID con l'ID progetto.
Sostituisci con il nome della regione Dataflow. Ad esempio: us-central1.
Sostituisci JOB_NAME con un nome job a tua scelta. Per essere valido, il nome del job deve corrispondere all'espressione regolare
[a-z]([-a-z0-9]{0,38}[a-z0-9])?.
Sostituisci INPUT_TOPIC con il nome dell'argomento di input del server MQTT. Ad esempio: testtopic.
Sostituisci MQTT_SERVER con gli indirizzi del server MQTT. Ad esempio: tcp://10.128.0.62:1883
Sostituisci OUTPUT_TOPIC con il nome dell'argomento di output Pub/Sub. Ad esempio: projects/myproject/topics/testoutput.
Sostituisci USERNAME con il nome utente del server MQTT. Ad esempio: testuser.
Sostituisci PASSWORD con la password che corrisponde al nome utente utilizzato con il server MQTT.
Codice sorgente del modello
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
/**
* Dataflow template which reads data from Mqtt Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mqtt-to-pubsub/README_Mqtt_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Mqtt_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "MQTT to Pubsub",
description =
"The MQTT to Pub/Sub template is a streaming pipeline that reads messages from an MQTT topic and writes them to Pub/Sub. "
+ "It includes the optional parameters <code>username</code> and <code>password</code> in case authentication is required by the MQTT server.",
optionsClass = MqttToPubsub.MqttToPubsubOptions.class,
flexContainerName = "mqtt-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/mqtt-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The MQTT host IP must exist and have the proper network configuration for worker machines to reach the MQTT host.",
"The MQTT topic that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class MqttToPubsub {
/**
* Runs a pipeline which reads data from Mqtt topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
MqttToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(MqttToPubsubOptions.class);
run(options);
}
public static void validate(MqttToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null && !options.getUsername().isEmpty())
&& (options.getPassword() == null || options.getPassword().isBlank())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(MqttToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
MqttIO.Read mqttIo;
if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
mqttIo =
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
options.getBrokerServer(), options.getInputTopic())
.withUsername(options.getUsername())
.withPassword(options.getPassword()));
} else {
mqttIo =
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
options.getBrokerServer(), options.getInputTopic()));
}
return pipeline
.apply("ReadFromMqttTopic", mqttIo)
.apply(ParDo.of(new ByteToStringTransform()))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()))
.getPipeline()
.run();
}
static class ByteToStringTransform extends DoFn<byte[], String> {
@ProcessElement
public void processElement(@Element byte[] word, OutputReceiver<String> out) {
out.output(new String(word, StandardCharsets.UTF_8));
}
}
/**
* The {@link MqttToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface MqttToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "MQTT Broker IP",
helpText = "The MQTT broker server IP or host.",
example = "tcp://host:1883")
@Validation.Required
String getBrokerServer();
void setBrokerServer(String brokerServer);
@TemplateParameter.Text(
order = 2,
optional = false,
regexes = {"[\\/a-zA-Z0-9._-]+"},
description = "MQTT topic(s) to read the input from",
helpText = "The name of the MQTT topic that data is read from.",
example = "topic")
@Validation.Required
String getInputTopic();
void setInputTopic(String inputTopics);
@TemplateParameter.PubsubTopic(
order = 3,
description = "Output Pub/Sub topic",
helpText = "The name of the output Pub/Sub topic that data is written to.",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 4,
description = "MQTT Username",
helpText = "The username to use for authentication on the MQTT server.",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Password(
order = 5,
description = "MQTT Password",
helpText = "The password associated with the provided username.",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}