Vorlage „MQTT für Pub/Sub“

Die Vorlage „MQTT für Pub/Sub“ ist eine Streamingpipeline, die Nachrichten aus einem MQTT-Thema liest und in Pub/Sub schreibt. Sie enthält die optionalen Parameter username und password, falls der MQTT-Server eine Authentifizierung benötigt.

Wenn die Pipeline länger als 90 Minuten keine Nachricht vom MQTT-Topic empfängt, tritt StackOverflowError auf. Als Behelfslösung können Sie die Anzahl der Worker alle 90 Minuten ändern. Weitere Informationen zum Ändern der Anzahl der Worker, ohne den Job zu beenden, finden Sie unter Aktualisierung der Option des laufenden Jobs.

Pipelineanforderungen

  • Der Name des Pub/Sub-Ausgabethemas muss vorhanden sein.
  • Die MQTT-Host-IP-Adresse muss vorhanden sein und die erforderliche Netzwerkkonfiguration haben, damit Worker-Maschinen den MQTT-Host erreichen können.
  • Das MQTT-Thema, aus dem Daten extrahiert werden, muss einen Namen haben.

Vorlagenparameter

Erforderliche Parameter

  • inputTopic: Der Name des MQTT-Themas, aus dem Daten gelesen werden. Beispiel: topic.
  • outputTopic: Der Name des Pub/Sub-Ausgabethemas, in das Daten geschrieben werden. Beispiel: projects/your-project-id/topics/your-topic-name.
  • username: Der Nutzername, der für die Authentifizierung auf dem MQTT-Server verwendet werden soll. Beispiel: sampleusername.
  • password: Das Passwort, das dem angegebenen Nutzernamen zugeordnet ist. Beispiel: samplepassword.

Optionale Parameter

  • brokerServer: Die IP-Adresse oder der Host des MQTT-Brokers. Beispiel: tcp://host:1883.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option MQTT to Pub/Sub templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \
    --parameters \
brokerServer=MQTT_SERVER,\
inputTopic=INPUT_TOPIC,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

In diesem Beispiel müssen Sie die folgenden Werte ersetzen:

  • Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
  • Ersetzen Sie durch den Namen der Dataflow-Region. Beispiel: us-central1
  • Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl. Der Jobname ist nur gültig, wenn er dem regulären Ausdruck [a-z]([-a-z0-9]{0,38}[a-z0-9])? entspricht.
  • Ersetzen Sie INPUT_TOPIC durch den Namen des MQTT-Servereingabethemas. Beispiel: testtopic
  • Ersetzen Sie MQTT_SERVER durch die MQTT-Serveradressen. Beispiel: tcp://10.128.0.62:1883
  • Ersetzen Sie OUTPUT_TOPIC durch den Namen des Pub/Sub-Ausgabethemas. Beispiel: projects/myproject/topics/testoutput
  • Ersetzen Sie USERNAME durch den Nutzernamen für den MQTT-Server. Beispiel: testuser
  • Ersetzen Sie PASSWORD durch das Passwort für den Nutzernamen, der mit dem MQTT-Server verwendet wird.

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "brokerServer": "MQTT_SERVER",
          "inputTopic": "INPUT_TOPIC",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub",
   }
}
  

In diesem Beispiel müssen Sie die folgenden Werte ersetzen:

  • Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
  • Ersetzen Sie durch den Namen der Dataflow-Region. Beispiel: us-central1
  • Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl. Der Jobname ist nur gültig, wenn er dem regulären Ausdruck [a-z]([-a-z0-9]{0,38}[a-z0-9])? entspricht.
  • Ersetzen Sie INPUT_TOPIC durch den Namen des MQTT-Servereingabethemas. Beispiel: testtopic
  • Ersetzen Sie MQTT_SERVER durch die MQTT-Serveradressen. Beispiel: tcp://10.128.0.62:1883
  • Ersetzen Sie OUTPUT_TOPIC durch den Namen des Pub/Sub-Ausgabethemas. Beispiel: projects/myproject/topics/testoutput
  • Ersetzen Sie USERNAME durch den Nutzernamen für den MQTT-Server. Beispiel: testuser
  • Ersetzen Sie PASSWORD durch das Passwort für den Nutzernamen, der mit dem MQTT-Server verwendet wird.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

/**
 * Dataflow template which reads data from Mqtt Topic and writes it to Cloud PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mqtt-to-pubsub/README_Mqtt_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Mqtt_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "MQTT to Pubsub",
    description =
        "The MQTT to Pub/Sub template is a streaming pipeline that reads messages from an MQTT topic and writes them to Pub/Sub. "
            + "It includes the optional parameters <code>username</code> and <code>password</code> in case authentication is required by the MQTT server.",
    optionsClass = MqttToPubsub.MqttToPubsubOptions.class,
    flexContainerName = "mqtt-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/mqtt-to-pubsub",
    preview = true,
    requirements = {
      "The Pub/Sub output topic name must exist.",
      "The MQTT host IP must exist and have the proper network configuration for worker machines to reach the MQTT host.",
      "The MQTT topic that data is extracted from must have a name."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class MqttToPubsub {

  /**
   * Runs a pipeline which reads data from Mqtt topic and writes it to Cloud PubSub.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    MqttToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(MqttToPubsubOptions.class);
    run(options);
  }

  public static void validate(MqttToPubsubOptions options) {
    if (options != null) {
      if ((options.getUsername() != null && !options.getUsername().isEmpty())
          && (options.getPassword() == null || options.getPassword().isBlank())) {
        throw new IllegalArgumentException(
            "While username is provided, password is required for authentication");
      }
    }
  }

  public static PipelineResult run(MqttToPubsubOptions options) {
    validate(options);
    Pipeline pipeline = Pipeline.create(options);
    MqttIO.Read<byte[]> mqttIo;
    if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
      mqttIo =
          MqttIO.read()
              .withConnectionConfiguration(
                  MqttIO.ConnectionConfiguration.create(
                          options.getBrokerServer(), options.getInputTopic())
                      .withUsername(options.getUsername())
                      .withPassword(options.getPassword()));
    } else {
      mqttIo =
          MqttIO.read()
              .withConnectionConfiguration(
                  MqttIO.ConnectionConfiguration.create(
                      options.getBrokerServer(), options.getInputTopic()));
    }

    return pipeline
        .apply("ReadFromMqttTopic", mqttIo)
        .apply(ParDo.of(new ByteToStringTransform()))
        .apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()))
        .getPipeline()
        .run();
  }

  static class ByteToStringTransform extends DoFn<byte[], String> {
    @ProcessElement
    public void processElement(@Element byte[] word, OutputReceiver<String> out) {
      out.output(new String(word, StandardCharsets.UTF_8));
    }
  }

  /**
   * The {@link MqttToPubsubOptions} interface provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface MqttToPubsubOptions extends PipelineOptions {
    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        optional = true,
        regexes = {"[,\\/:a-zA-Z0-9._-]+"},
        description = "MQTT Broker IP",
        helpText = "The MQTT broker server IP or host.",
        example = "tcp://host:1883")
    @Validation.Required
    String getBrokerServer();

    void setBrokerServer(String brokerServer);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        optional = false,
        regexes = {"[\\/a-zA-Z0-9._-]+"},
        description = "MQTT topic(s) to read the input from",
        helpText = "The name of the MQTT topic that data is read from.",
        example = "topic")
    @Validation.Required
    String getInputTopic();

    void setInputTopic(String inputTopics);

    @TemplateParameter.PubsubTopic(
        order = 3,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The name of the output Pub/Sub topic that data is written to.",
        example = "projects/your-project-id/topics/your-topic-name")
    @Validation.Required
    String getOutputTopic();

    void setOutputTopic(String outputTopic);

    @TemplateParameter.Text(
        order = 4,
        description = "MQTT Username",
        helpText = "The username to use for authentication on the MQTT server.",
        example = "sampleusername")
    String getUsername();

    void setUsername(String username);

    @TemplateParameter.Password(
        order = 5,
        description = "MQTT Password",
        helpText = "The password associated with the provided username.",
        example = "samplepassword")
    String getPassword();

    void setPassword(String password);
  }
}

Nächste Schritte