JMS to Pub/Sub 템플릿

JMS to Pub/Sub 템플릿은 Active MQ JMS 서버(큐/주제)에서 메시지를 읽고 Pub/Sub에 쓰는 스트리밍 파이프라인입니다.

파이프라인 요구사항

  • Pub/Sub 출력 주제 이름이 있어야 합니다.
  • JMS 호스트 IP가 있어야 하며 Dataflow 작업자 VM이 JMS 호스트에 연결할 수 있도록 적절한 네트워크 구성이 있어야 합니다.
  • 데이터가 추출되는 JMS 주제/큐에는 이름이 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • inputName: 데이터를 읽는 JMS 주제 또는 큐의 이름입니다. 예를 들면 queue입니다.
  • inputType: 데이터를 읽을 JMS 대상 유형입니다. 큐나 주제일 수 있습니다. 예를 들면 queue입니다.
  • outputTopic: 데이터를 게시할 Pub/Sub 주제의 이름입니다. 예를 들면 projects/<PROJECT_ID>/topics/<TOPIC_NAME>입니다.
  • username: JMS 서버에서 인증에 사용할 사용자 이름입니다. 예를 들면 sampleusername입니다.
  • password: 제공된 사용자 이름과 연결된 비밀번호입니다. 예를 들면 samplepassword입니다.

선택적 매개변수

  • jmsServer: JMS(ActiveMQ) 서버 IP입니다. 예를 들면 tcp://10.0.0.1:61616입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 JMS to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/JMS_to_Cloud_PubSub \
    --parameters \
jmsServer=JMS_SERVER,\
inputName=INPUT_NAME,\
inputType=INPUT_TYPE,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

이 예시에서 다음 값을 바꿔야 합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • 을 Dataflow 리전 이름으로 바꿉니다. 예를 들면 us-central1입니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • JMS_SERVER를 JMS 서버 주소로 바꿉니다. 예를 들면 tcp://10.0.0.0:61616입니다.
  • INPUT_NAME을 JMS 서버 입력 주제/큐의 이름으로 바꿉니다. 예를 들면 testtopic입니다.
  • INPUT_TYPE을 JMS 서버 대상 유형(큐/주제)으로 바꿉니다. 예를 들면 topic입니다.
  • OUTPUT_TOPIC을 Pub/Sub 출력 주제 이름으로 바꿉니다. 예를 들면 projects/myproject/topics/testoutput입니다.
  • USERNAME을 JMS 서버의 사용자 이름으로 바꿉니다. 예를 들면 testuser입니다.
  • PASSWORD를 JMS 서버에 사용되는 사용자 이름에 해당하는 비밀번호로 바꿉니다.

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "jmsServer": "JMS_SERVER",
          "inputName": "INPUT_NAME",
          "inputType": "INPUT_TYPE",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/JMS_to_Cloud_PubSub",
   }
}
  

이 예시에서 다음 값을 바꿔야 합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • 을 Dataflow 리전 이름으로 바꿉니다. 예를 들면 us-central1입니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • JMS_SERVER를 JMS 서버 주소로 바꿉니다. 예를 들면 tcp://10.0.0.0:61616입니다.
  • INPUT_NAME을 JMS 서버 입력 주제/큐의 이름으로 바꿉니다. 예를 들면 testtopic입니다.
  • INPUT_TYPE을 JMS 서버 대상 유형(큐/주제)으로 바꿉니다. 예를 들면 topic입니다.
  • OUTPUT_TOPIC을 Pub/Sub 출력 주제 이름으로 바꿉니다. 예를 들면 projects/myproject/topics/testoutput입니다.
  • USERNAME을 JMS 서버의 사용자 이름으로 바꿉니다. 예를 들면 testuser입니다.
  • PASSWORD를 JMS 서버에 사용되는 사용자 이름에 해당하는 비밀번호로 바꿉니다.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jms-to-pubsub/README_Jms_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jms_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "JMS to Pubsub",
    description =
        "The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
    optionsClass = com.google.cloud.teleport.v2.templates.JmsToPubsub.JmsToPubsubOptions.class,
    flexContainerName = "jms-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jms-to-pubsub",
    preview = true,
    requirements = {
      "The Pub/Sub output topic name must exist.",
      "The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
      "The JMS topic/queue that data is extracted from must have a name."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class JmsToPubsub {

  /**
   * Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
   *
   * @param args arguments to the pipeline
   */
  private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);

  public static void main(String[] args) {
    JmsToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JmsToPubsubOptions.class);
    run(options);
  }

  public static void validate(JmsToPubsubOptions options) {
    if (options != null) {
      if ((options.getUsername() != null
              && (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
          && (options.getPassword() == null
              || options.getPassword().isBlank()
              || options.getPassword().isEmpty())) {
        throw new IllegalArgumentException(
            "While username is provided, password is required for authentication");
      }
    }
  }

  public static PipelineResult run(JmsToPubsubOptions options) {
    validate(options);
    Pipeline pipeline = Pipeline.create(options);
    String connectionURI = options.getJmsServer();
    ConnectionFactory myConnectionFactory;
    PCollection<JmsRecord> input;

    if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
      myConnectionFactory =
          new ActiveMQConnectionFactory(
              options.getUsername(), options.getPassword(), connectionURI);
    } else {
      myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
    }
    LOG.info("Given Input Type " + options.getInputType());
    if (options.getInputType().equalsIgnoreCase("queue")) {
      input =
          pipeline.apply(
              "Read From JMS Queue",
              JmsIO.read()
                  .withConnectionFactory(myConnectionFactory)
                  .withQueue(options.getInputName()));
    } else {
      input =
          pipeline.apply(
              "Read From JMS Topic",
              JmsIO.read()
                  .withConnectionFactory(myConnectionFactory)
                  .withTopic(options.getInputName()));
    }

    input
        .apply(
            MapElements.via(
                new SimpleFunction<JmsRecord, String>() {
                  public String apply(JmsRecord input) {
                    return input.getPayload();
                  }
                }))
        .apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
    return pipeline.run();
  }

  /**
   * The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface JmsToPubsubOptions extends PipelineOptions {
    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        optional = true,
        regexes = {"[,\\/:a-zA-Z0-9._-]+"},
        description = "JMS Host IP",
        helpText = "The JMS (ActiveMQ) Server IP.",
        example = "tcp://10.0.0.1:61616")
    @Validation.Required
    String getJmsServer();

    void setJmsServer(String jmsServer);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        optional = false,
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "JMS Queue/Topic Name to read the input from",
        helpText = "The name of the JMS topic or queue that data is read from.",
        example = "queue")
    @Validation.Required
    String getInputName();

    void setInputName(String inputName);

    @TemplateParameter.Text(
        order = 3,
        optional = false,
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "JMS Destination Type to read the input from",
        helpText = "The JMS destination type to read data from. Can be a queue or a topic.",
        example = "queue")
    @Validation.Required
    String getInputType();

    void setInputType(String inputType);

    @TemplateParameter.PubsubTopic(
        order = 4,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The name of the Pub/Sub topic to publish data to.",
        example = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>")
    @Validation.Required
    String getOutputTopic();

    void setOutputTopic(String outputTopic);

    @TemplateParameter.Text(
        order = 5,
        description = "JMS Username",
        helpText = "The username to use for authentication on the JMS server.",
        example = "sampleusername")
    String getUsername();

    void setUsername(String username);

    @TemplateParameter.Password(
        order = 6,
        description = "JMS Password",
        helpText = "The password associated with the provided username.",
        example = "samplepassword")
    String getPassword();

    void setPassword(String password);
  }
}

다음 단계