MQTT to Pub/Sub 템플릿

MQTT to Pub/Sub 템플릿은 MQTT 주제에서 메시지를 읽고 Pub/Sub에 쓰는 스트리밍 파이프라인입니다. 여기에는 MQTT 서버에 인증이 필요한 경우 선택적 매개변수 usernamepassword가 포함됩니다.

파이프라인이 90분 넘게 MQTT 주제에서 메시지를 수신하지 않으면 StackOverflowError이 발생합니다. 이 문제를 해결하려면 90분마다 작업자 수를 변경하면 됩니다. 작업을 중지하지 않고 작업자 수를 변경하는 방법에 관한 자세한 내용은 진행 중인 작업 옵션 업데이트를 참고하세요.

파이프라인 요구사항

  • Pub/Sub 출력 주제 이름이 있어야 합니다.
  • MQTT 호스트 IP가 있어야 하며 작업자 머신이 MQTT 호스트에 연결할 수 있도록 적절한 네트워크 구성이 있어야 합니다.
  • 데이터가 추출되는 MQTT 주제에는 이름이 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • inputTopic: 데이터를 읽어 오는 MQTT 주제의 이름입니다. 예를 들면 topic입니다.
  • outputTopic: 데이터를 기록하는 출력 Pub/Sub 주제의 이름입니다. 예를 들면 projects/your-project-id/topics/your-topic-name입니다.
  • username: MQTT 서버에서 인증에 사용할 사용자 이름입니다. 예를 들면 sampleusername입니다.
  • password: 제공된 사용자 이름과 연결된 비밀번호입니다. 예를 들면 samplepassword입니다.

선택적 매개변수

  • brokerServer: MQTT 브로커 서버 IP 또는 호스트입니다. 예를 들면 tcp://host:1883입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 MQTT to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \
    --parameters \
brokerServer=MQTT_SERVER,\
inputTopic=INPUT_TOPIC,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

이 예시에서 다음 값을 바꿔야 합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • 을 Dataflow 리전 이름으로 바꿉니다. 예를 들면 us-central1입니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • INPUT_TOPIC를 MQTT 서버 입력 주제의 이름으로 바꿉니다. 예를 들면 testtopic입니다.
  • MQTT_SERVER를 MQTT 서버 주소로 바꿉니다. 예를 들면 tcp://10.128.0.62:1883입니다.
  • OUTPUT_TOPIC을 Pub/Sub 출력 주제 이름으로 바꿉니다. 예를 들면 projects/myproject/topics/testoutput입니다.
  • USERNAME를 MQTT 서버의 사용자 이름으로 바꿉니다. 예를 들면 testuser입니다.
  • PASSWORD를 MQTT 서버에 사용되는 사용자 이름에 해당하는 비밀번호로 바꿉니다.

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "brokerServer": "MQTT_SERVER",
          "inputTopic": "INPUT_TOPIC",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub",
   }
}
  

이 예시에서 다음 값을 바꿔야 합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • 을 Dataflow 리전 이름으로 바꿉니다. 예를 들면 us-central1입니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • INPUT_TOPIC를 MQTT 서버 입력 주제의 이름으로 바꿉니다. 예를 들면 testtopic입니다.
  • MQTT_SERVER를 MQTT 서버 주소로 바꿉니다. 예를 들면 tcp://10.128.0.62:1883입니다.
  • OUTPUT_TOPIC을 Pub/Sub 출력 주제 이름으로 바꿉니다. 예를 들면 projects/myproject/topics/testoutput입니다.
  • USERNAME를 MQTT 서버의 사용자 이름으로 바꿉니다. 예를 들면 testuser입니다.
  • PASSWORD를 MQTT 서버에 사용되는 사용자 이름에 해당하는 비밀번호로 바꿉니다.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

/**
 * Dataflow template which reads data from Mqtt Topic and writes it to Cloud PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/mqtt-to-pubsub/README_Mqtt_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Mqtt_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "MQTT to Pubsub",
    description =
        "The MQTT to Pub/Sub template is a streaming pipeline that reads messages from an MQTT topic and writes them to Pub/Sub. "
            + "It includes the optional parameters <code>username</code> and <code>password</code> in case authentication is required by the MQTT server.",
    optionsClass = MqttToPubsub.MqttToPubsubOptions.class,
    flexContainerName = "mqtt-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/mqtt-to-pubsub",
    preview = true,
    requirements = {
      "The Pub/Sub output topic name must exist.",
      "The MQTT host IP must exist and have the proper network configuration for worker machines to reach the MQTT host.",
      "The MQTT topic that data is extracted from must have a name."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class MqttToPubsub {

  /**
   * Runs a pipeline which reads data from Mqtt topic and writes it to Cloud PubSub.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    MqttToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(MqttToPubsubOptions.class);
    run(options);
  }

  public static void validate(MqttToPubsubOptions options) {
    if (options != null) {
      if ((options.getUsername() != null && !options.getUsername().isEmpty())
          && (options.getPassword() == null || options.getPassword().isBlank())) {
        throw new IllegalArgumentException(
            "While username is provided, password is required for authentication");
      }
    }
  }

  public static PipelineResult run(MqttToPubsubOptions options) {
    validate(options);
    Pipeline pipeline = Pipeline.create(options);
    MqttIO.Read<byte[]> mqttIo;
    if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
      mqttIo =
          MqttIO.read()
              .withConnectionConfiguration(
                  MqttIO.ConnectionConfiguration.create(
                          options.getBrokerServer(), options.getInputTopic())
                      .withUsername(options.getUsername())
                      .withPassword(options.getPassword()));
    } else {
      mqttIo =
          MqttIO.read()
              .withConnectionConfiguration(
                  MqttIO.ConnectionConfiguration.create(
                      options.getBrokerServer(), options.getInputTopic()));
    }

    return pipeline
        .apply("ReadFromMqttTopic", mqttIo)
        .apply(ParDo.of(new ByteToStringTransform()))
        .apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()))
        .getPipeline()
        .run();
  }

  static class ByteToStringTransform extends DoFn<byte[], String> {
    @ProcessElement
    public void processElement(@Element byte[] word, OutputReceiver<String> out) {
      out.output(new String(word, StandardCharsets.UTF_8));
    }
  }

  /**
   * The {@link MqttToPubsubOptions} interface provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface MqttToPubsubOptions extends PipelineOptions {
    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        optional = true,
        regexes = {"[,\\/:a-zA-Z0-9._-]+"},
        description = "MQTT Broker IP",
        helpText = "The MQTT broker server IP or host.",
        example = "tcp://host:1883")
    @Validation.Required
    String getBrokerServer();

    void setBrokerServer(String brokerServer);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        optional = false,
        regexes = {"[\\/a-zA-Z0-9._-]+"},
        description = "MQTT topic(s) to read the input from",
        helpText = "The name of the MQTT topic that data is read from.",
        example = "topic")
    @Validation.Required
    String getInputTopic();

    void setInputTopic(String inputTopics);

    @TemplateParameter.PubsubTopic(
        order = 3,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The name of the output Pub/Sub topic that data is written to.",
        example = "projects/your-project-id/topics/your-topic-name")
    @Validation.Required
    String getOutputTopic();

    void setOutputTopic(String outputTopic);

    @TemplateParameter.Text(
        order = 4,
        description = "MQTT Username",
        helpText = "The username to use for authentication on the MQTT server.",
        example = "sampleusername")
    String getUsername();

    void setUsername(String username);

    @TemplateParameter.Password(
        order = 5,
        description = "MQTT Password",
        helpText = "The password associated with the provided username.",
        example = "samplepassword")
    String getPassword();

    void setPassword(String password);
  }
}

다음 단계