Modèle MQTT vers Pub/Sub

Le modèle MQTT vers Pub/Sub est un pipeline de streaming qui lit les messages d'un sujet MQTT et les écrit dans Pub/Sub. Il inclut les paramètres facultatifs username et password au cas où l'authentification serait requise par le serveur MQTT.

Si le pipeline ne reçoit aucun message du sujet MQTT pendant plus de 90 minutes, une erreur StackOverflowError se produit. Pour contourner ce problème, vous pouvez modifier le nombre de nœuds de calcul toutes les 90 minutes. Pour en savoir plus sur la modification du nombre de nœuds de calcul sans arrêter votre job, consultez la section Mise à jour des options de job en cours.

Conditions requises pour ce pipeline

  • Le nom du sujet de sortie Pub/Sub doit déjà exister.
  • L'adresse IP de l'hôte MQTT doit exister et présenter la configuration réseau appropriée pour que les machines de nœud de calcul puissent atteindre l'hôte MQTT.
  • Le sujet MQTT à partir duquel les données sont extraites doit avoir un nom.

Paramètres de modèle

Paramètres obligatoires

  • inputTopic: nom du sujet MQTT à partir duquel les données sont lues. Exemple :topic
  • outputTopic: nom du sujet Pub/Sub de sortie dans lequel les données sont écrites. Exemple :projects/your-project-id/topics/your-topic-name
  • username: nom d'utilisateur à utiliser pour l'authentification sur le serveur MQTT. Exemple :sampleusername
  • password: mot de passe associé au nom d'utilisateur fourni. Exemple :samplepassword

Paramètres facultatifs

  • brokerServer: adresse IP ou hôte du serveur de courtage MQTT. Exemple :tcp://host:1883

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez MQTT to Pub/Sub template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \
    --parameters \
brokerServer=MQTT_SERVER,\
inputTopic=INPUT_TOPIC,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID du projet.
  • Remplacez par le nom de la région Dataflow. Exemple : us-central1.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez INPUT_TOPIC par le nom du sujet d'entrée sur le serveur MQTT. Exemple : testtopic.
  • Remplacez MQTT_SERVER par les adresses du serveur MQTT. Par exemple : tcp://10.128.0.62:1883
  • Remplacez OUTPUT_TOPIC par le nom du sujet de sortie Pub/Sub. Exemple : projects/myproject/topics/testoutput.
  • Remplacez USERNAME par le nom d'utilisateur pour le serveur MQTT. Exemple : testuser.
  • Remplacez PASSWORD par le mot de passe correspondant au nom d'utilisateur associé au serveur MQTT.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "brokerServer": "MQTT_SERVER",
          "inputTopic": "INPUT_TOPIC",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub",
   }
}
  

Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :

  • Remplacez YOUR_PROJECT_ID par l'ID du projet.
  • Remplacez par le nom de la région Dataflow. Exemple : us-central1.
  • Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
  • Remplacez INPUT_TOPIC par le nom du sujet d'entrée sur le serveur MQTT. Exemple : testtopic.
  • Remplacez MQTT_SERVER par les adresses du serveur MQTT. Par exemple : tcp://10.128.0.62:1883
  • Remplacez OUTPUT_TOPIC par le nom du sujet de sortie Pub/Sub. Exemple : projects/myproject/topics/testoutput.
  • Remplacez USERNAME par le nom d'utilisateur pour le serveur MQTT. Exemple : testuser.
  • Remplacez PASSWORD par le mot de passe correspondant au nom d'utilisateur associé au serveur MQTT.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

/**
 * Dataflow template which reads data from Mqtt Topic and writes it to Cloud PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mqtt-to-pubsub/README_Mqtt_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Mqtt_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "MQTT to Pubsub",
    description =
        "The MQTT to Pub/Sub template is a streaming pipeline that reads messages from an MQTT topic and writes them to Pub/Sub. "
            + "It includes the optional parameters <code>username</code> and <code>password</code> in case authentication is required by the MQTT server.",
    optionsClass = MqttToPubsub.MqttToPubsubOptions.class,
    flexContainerName = "mqtt-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/mqtt-to-pubsub",
    preview = true,
    requirements = {
      "The Pub/Sub output topic name must exist.",
      "The MQTT host IP must exist and have the proper network configuration for worker machines to reach the MQTT host.",
      "The MQTT topic that data is extracted from must have a name."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class MqttToPubsub {

  /**
   * Runs a pipeline which reads data from Mqtt topic and writes it to Cloud PubSub.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    MqttToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(MqttToPubsubOptions.class);
    run(options);
  }

  public static void validate(MqttToPubsubOptions options) {
    if (options != null) {
      if ((options.getUsername() != null && !options.getUsername().isEmpty())
          && (options.getPassword() == null || options.getPassword().isBlank())) {
        throw new IllegalArgumentException(
            "While username is provided, password is required for authentication");
      }
    }
  }

  public static PipelineResult run(MqttToPubsubOptions options) {
    validate(options);
    Pipeline pipeline = Pipeline.create(options);
    MqttIO.Read<byte[]> mqttIo;
    if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
      mqttIo =
          MqttIO.read()
              .withConnectionConfiguration(
                  MqttIO.ConnectionConfiguration.create(
                          options.getBrokerServer(), options.getInputTopic())
                      .withUsername(options.getUsername())
                      .withPassword(options.getPassword()));
    } else {
      mqttIo =
          MqttIO.read()
              .withConnectionConfiguration(
                  MqttIO.ConnectionConfiguration.create(
                      options.getBrokerServer(), options.getInputTopic()));
    }

    return pipeline
        .apply("ReadFromMqttTopic", mqttIo)
        .apply(ParDo.of(new ByteToStringTransform()))
        .apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()))
        .getPipeline()
        .run();
  }

  static class ByteToStringTransform extends DoFn<byte[], String> {
    @ProcessElement
    public void processElement(@Element byte[] word, OutputReceiver<String> out) {
      out.output(new String(word, StandardCharsets.UTF_8));
    }
  }

  /**
   * The {@link MqttToPubsubOptions} interface provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface MqttToPubsubOptions extends PipelineOptions {
    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        optional = true,
        regexes = {"[,\\/:a-zA-Z0-9._-]+"},
        description = "MQTT Broker IP",
        helpText = "The MQTT broker server IP or host.",
        example = "tcp://host:1883")
    @Validation.Required
    String getBrokerServer();

    void setBrokerServer(String brokerServer);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        optional = false,
        regexes = {"[\\/a-zA-Z0-9._-]+"},
        description = "MQTT topic(s) to read the input from",
        helpText = "The name of the MQTT topic that data is read from.",
        example = "topic")
    @Validation.Required
    String getInputTopic();

    void setInputTopic(String inputTopics);

    @TemplateParameter.PubsubTopic(
        order = 3,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The name of the output Pub/Sub topic that data is written to.",
        example = "projects/your-project-id/topics/your-topic-name")
    @Validation.Required
    String getOutputTopic();

    void setOutputTopic(String outputTopic);

    @TemplateParameter.Text(
        order = 4,
        description = "MQTT Username",
        helpText = "The username to use for authentication on the MQTT server.",
        example = "sampleusername")
    String getUsername();

    void setUsername(String username);

    @TemplateParameter.Password(
        order = 5,
        description = "MQTT Password",
        helpText = "The password associated with the provided username.",
        example = "samplepassword")
    String getPassword();

    void setPassword(String password);
  }
}

Étape suivante