Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Die Vorlage "MQTT für Pub/Sub" ist eine Streamingpipeline, die Nachrichten aus einem MQTT-Thema liest und in Pub/Sub schreibt.
Sie enthält die optionalen Parameter username und password, falls der MQTT-Server eine Authentifizierung benötigt.
Wenn die Pipeline länger als 90 Minuten keine Nachricht vom MQTT-Thema empfängt, tritt ein StackOverflowError auf.
Als Behelfslösung können Sie die Anzahl der Worker alle 90 Minuten ändern.
Weitere Informationen zum Ändern der Anzahl der Worker, ohne den Job zu beenden, finden Sie unter Aktualisierung der Option des laufenden Jobs.
Pipelineanforderungen
Der Name des Pub/Sub-Ausgabethemas muss vorhanden sein.
Die MQTT-Host-IP-Adresse muss vorhanden sein und die erforderliche Netzwerkkonfiguration haben, damit Worker-Maschinen den MQTT-Host erreichen können.
Das MQTT-Thema, aus dem Daten extrahiert werden, muss einen Namen haben.
Vorlagenparameter
Parameter
Beschreibung
brokerServer
Die IP-Adresse oder der Host des MQTT-Broker-Servers. Beispiel: tcp://10.0.0.1:1883.
inputTopic
Der Name des MQTT-Themas, aus dem Daten gelesen werden.
outputTopic
Der Name des Pub/Sub-Ausgabethemas, in das Daten geschrieben werden.
username
(Optional) Der Nutzername für die Authentifizierung auf dem MQTT-Server.
password
(Optional) Das Passwort, das dem angegebenen Nutzernamen zugeordnet ist.
Führen Sie die Vorlage aus.
Console
Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
In diesem Beispiel müssen Sie die folgenden Werte ersetzen:
Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
Ersetzen Sie durch den Namen der Dataflow-Region. Beispiel: us-central1
Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl. Der Jobname ist nur gültig, wenn er dem regulären Ausdruck [a-z]([-a-z0-9]{0,38}[a-z0-9])? entspricht.
Ersetzen Sie INPUT_TOPIC durch den Namen des MQTT-Servereingabethemas. Beispiel: testtopic
Ersetzen Sie MQTT_SERVER durch die MQTT-Serveradressen. Beispiel: tcp://10.128.0.62:1883
Ersetzen Sie OUTPUT_TOPIC durch den Namen des Pub/Sub-Ausgabe-Themas. Beispiel: projects/myproject/topics/testoutput
Ersetzen Sie USERNAME durch den Nutzernamen für den MQTT-Server. Beispiel: testuser
Ersetzen Sie PASSWORD durch das Passwort für den Nutzernamen, der mit dem MQTT-Server verwendet wird.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.
In diesem Beispiel müssen Sie die folgenden Werte ersetzen:
Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
Ersetzen Sie durch den Namen der Dataflow-Region. Beispiel: us-central1
Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl. Der Jobname ist nur gültig, wenn er dem regulären Ausdruck [a-z]([-a-z0-9]{0,38}[a-z0-9])? entspricht.
Ersetzen Sie INPUT_TOPIC durch den Namen des MQTT-Servereingabethemas. Beispiel: testtopic
Ersetzen Sie MQTT_SERVER durch die MQTT-Serveradressen. Beispiel: tcp://10.128.0.62:1883
Ersetzen Sie OUTPUT_TOPIC durch den Namen des Pub/Sub-Ausgabe-Themas. Beispiel: projects/myproject/topics/testoutput
Ersetzen Sie USERNAME durch den Nutzernamen für den MQTT-Server. Beispiel: testuser
Ersetzen Sie PASSWORD durch das Passwort für den Nutzernamen, der mit dem MQTT-Server verwendet wird.
Quellcode der Vorlage
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
/**
* Dataflow template which reads data from Mqtt Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mqtt-to-pubsub/README_Mqtt_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Mqtt_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "MQTT to Pubsub",
description =
"The MQTT to Pub/Sub template is a streaming pipeline that reads messages from an MQTT topic and writes them to Pub/Sub. "
+ "It includes the optional parameters <code>username</code> and <code>password</code> in case authentication is required by the MQTT server.",
optionsClass = MqttToPubsub.MqttToPubsubOptions.class,
flexContainerName = "mqtt-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/mqtt-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The MQTT host IP must exist and have the proper network configuration for worker machines to reach the MQTT host.",
"The MQTT topic that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class MqttToPubsub {
/**
* Runs a pipeline which reads data from Mqtt topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
MqttToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(MqttToPubsubOptions.class);
run(options);
}
public static void validate(MqttToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null && !options.getUsername().isEmpty())
&& (options.getPassword() == null || options.getPassword().isBlank())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(MqttToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
MqttIO.Read mqttIo;
if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
mqttIo =
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
options.getBrokerServer(), options.getInputTopic())
.withUsername(options.getUsername())
.withPassword(options.getPassword()));
} else {
mqttIo =
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
options.getBrokerServer(), options.getInputTopic()));
}
return pipeline
.apply("ReadFromMqttTopic", mqttIo)
.apply(ParDo.of(new ByteToStringTransform()))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()))
.getPipeline()
.run();
}
static class ByteToStringTransform extends DoFn<byte[], String> {
@ProcessElement
public void processElement(@Element byte[] word, OutputReceiver<String> out) {
out.output(new String(word, StandardCharsets.UTF_8));
}
}
/**
* The {@link MqttToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface MqttToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "MQTT Broker IP",
helpText = "Server IP for MQTT broker",
example = "tcp://host:1883")
@Validation.Required
String getBrokerServer();
void setBrokerServer(String brokerServer);
@TemplateParameter.Text(
order = 2,
optional = false,
regexes = {"[\\/a-zA-Z0-9._-]+"},
description = "MQTT topic(s) to read the input from",
helpText = "MQTT topic(s) to read the input from.",
example = "topic")
@Validation.Required
String getInputTopic();
void setInputTopic(String inputTopics);
@TemplateParameter.PubsubTopic(
order = 3,
description = "Output Pub/Sub topic",
helpText =
"The name of the topic to which data should published, in the format of 'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 4,
description = "MQTT Username",
helpText = "MQTT username for authentication with MQTT server",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Password(
order = 5,
description = "MQTT Password",
helpText = "Password for username provided for authentication with MQTT server",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}