Vorlage "JMS für Pub/Sub"

Die Vorlage "JMS für Pub/Sub" ist eine Streamingpipeline, die Nachrichten vom Active MQ JMS Server (Warteschlange/Thema) liest und in Pub/Sub schreibt.

Pipelineanforderungen

  • Der Name des Pub/Sub-Ausgabethemas muss vorhanden sein.
  • Die JMS-Host-IP-Adresse muss vorhanden sein und die erforderliche Netzwerkkonfiguration haben, damit Dataflow-Worker-VMs den JMS-Host erreichen können.
  • Das JMS-Thema/die JMS-Warteschlange, aus dem/der Daten extrahiert werden, muss einen Namen haben.

Vorlagenparameter

Erforderliche Parameter

  • inputName: Der Name des JMS-Themas oder der JMS-Warteschlange, aus dem bzw. der Daten gelesen werden. Beispiel: queue.
  • inputType: Der JMS-Zieltyp, aus dem Daten gelesen werden sollen. Kann eine Warteschlange oder ein Thema sein. Beispiel: queue.
  • outputTopic: Der Name des Pub/Sub-Themas, in dem Daten veröffentlicht werden sollen. Beispiel: projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • username: Der Nutzername, der für die Authentifizierung auf dem JMS-Server verwendet werden soll. Beispiel: sampleusername.
  • password: Das Passwort, das dem angegebenen Nutzernamen zugeordnet ist. Beispiel: samplepassword.

Optionale Parameter

  • jmsServer: Die JMS-Server-IP (ActiveMQ). Beispiel: tcp://10.0.0.1:61616.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option JMS to Pub/Sub templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/JMS_to_Cloud_PubSub \
    --parameters \
jmsServer=JMS_SERVER,\
inputName=INPUT_NAME,\
inputType=INPUT_TYPE,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

In diesem Beispiel müssen Sie die folgenden Werte ersetzen:

  • Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
  • Ersetzen Sie durch den Namen der Dataflow-Region. Beispiel: us-central1
  • Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl. Der Jobname ist nur gültig, wenn er dem regulären Ausdruck [a-z]([-a-z0-9]{0,38}[a-z0-9])? entspricht.
  • Ersetzen Sie JMS_SERVER durch die Adressen der JMS-Server. Beispiel: tcp://10.0.0.0:61616
  • Ersetzen Sie INPUT_NAME durch den Namen des JMS-Servereingabethemas/der JMS-Eingabewarteschlange. Beispiel: testtopic
  • Ersetzen Sie INPUT_TYPE durch den Zieltyp des JMS-Servers (Warteschlange/Thema). Beispiel: topic
  • Ersetzen Sie OUTPUT_TOPIC durch den Namen des Pub/Sub-Ausgabethemas. Beispiel: projects/myproject/topics/testoutput
  • Ersetzen Sie USERNAME durch den Nutzernamen für den JMS-Server. Beispiel: testuser
  • Ersetzen Sie PASSWORD durch das Passwort für den Nutzernamen, der mit dem JMS-Server verwendet wird.

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "jmsServer": "JMS_SERVER",
          "inputName": "INPUT_NAME",
          "inputType": "INPUT_TYPE",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/JMS_to_Cloud_PubSub",
   }
}
  

In diesem Beispiel müssen Sie die folgenden Werte ersetzen:

  • Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
  • Ersetzen Sie durch den Namen der Dataflow-Region. Beispiel: us-central1
  • Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl. Der Jobname ist nur gültig, wenn er dem regulären Ausdruck [a-z]([-a-z0-9]{0,38}[a-z0-9])? entspricht.
  • Ersetzen Sie JMS_SERVER durch die Adressen der JMS-Server. Beispiel: tcp://10.0.0.0:61616
  • Ersetzen Sie INPUT_NAME durch den Namen des JMS-Servereingabethemas/der JMS-Eingabewarteschlange. Beispiel: testtopic
  • Ersetzen Sie INPUT_TYPE durch den Zieltyp des JMS-Servers (Warteschlange/Thema). Beispiel: topic
  • Ersetzen Sie OUTPUT_TOPIC durch den Namen des Pub/Sub-Ausgabethemas. Beispiel: projects/myproject/topics/testoutput
  • Ersetzen Sie USERNAME durch den Nutzernamen für den JMS-Server. Beispiel: testuser
  • Ersetzen Sie PASSWORD durch das Passwort für den Nutzernamen, der mit dem JMS-Server verwendet wird.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jms-to-pubsub/README_Jms_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jms_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "JMS to Pubsub",
    description =
        "The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
    optionsClass = com.google.cloud.teleport.v2.templates.JmsToPubsub.JmsToPubsubOptions.class,
    flexContainerName = "jms-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jms-to-pubsub",
    preview = true,
    requirements = {
      "The Pub/Sub output topic name must exist.",
      "The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
      "The JMS topic/queue that data is extracted from must have a name."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class JmsToPubsub {

  /**
   * Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
   *
   * @param args arguments to the pipeline
   */
  private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);

  public static void main(String[] args) {
    JmsToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JmsToPubsubOptions.class);
    run(options);
  }

  public static void validate(JmsToPubsubOptions options) {
    if (options != null) {
      if ((options.getUsername() != null
              && (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
          && (options.getPassword() == null
              || options.getPassword().isBlank()
              || options.getPassword().isEmpty())) {
        throw new IllegalArgumentException(
            "While username is provided, password is required for authentication");
      }
    }
  }

  public static PipelineResult run(JmsToPubsubOptions options) {
    validate(options);
    Pipeline pipeline = Pipeline.create(options);
    String connectionURI = options.getJmsServer();
    ConnectionFactory myConnectionFactory;
    PCollection<JmsRecord> input;

    if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
      myConnectionFactory =
          new ActiveMQConnectionFactory(
              options.getUsername(), options.getPassword(), connectionURI);
    } else {
      myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
    }
    LOG.info("Given Input Type " + options.getInputType());
    if (options.getInputType().equalsIgnoreCase("queue")) {
      input =
          pipeline.apply(
              "Read From JMS Queue",
              JmsIO.read()
                  .withConnectionFactory(myConnectionFactory)
                  .withQueue(options.getInputName()));
    } else {
      input =
          pipeline.apply(
              "Read From JMS Topic",
              JmsIO.read()
                  .withConnectionFactory(myConnectionFactory)
                  .withTopic(options.getInputName()));
    }

    input
        .apply(
            MapElements.via(
                new SimpleFunction<JmsRecord, String>() {
                  public String apply(JmsRecord input) {
                    return input.getPayload();
                  }
                }))
        .apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
    return pipeline.run();
  }

  /**
   * The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface JmsToPubsubOptions extends PipelineOptions {
    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        optional = true,
        regexes = {"[,\\/:a-zA-Z0-9._-]+"},
        description = "JMS Host IP",
        helpText = "The JMS (ActiveMQ) Server IP.",
        example = "tcp://10.0.0.1:61616")
    @Validation.Required
    String getJmsServer();

    void setJmsServer(String jmsServer);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        optional = false,
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "JMS Queue/Topic Name to read the input from",
        helpText = "The name of the JMS topic or queue that data is read from.",
        example = "queue")
    @Validation.Required
    String getInputName();

    void setInputName(String inputName);

    @TemplateParameter.Text(
        order = 3,
        optional = false,
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "JMS Destination Type to read the input from",
        helpText = "The JMS destination type to read data from. Can be a queue or a topic.",
        example = "queue")
    @Validation.Required
    String getInputType();

    void setInputType(String inputType);

    @TemplateParameter.PubsubTopic(
        order = 4,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The name of the Pub/Sub topic to publish data to.",
        example = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>")
    @Validation.Required
    String getOutputTopic();

    void setOutputTopic(String outputTopic);

    @TemplateParameter.Text(
        order = 5,
        description = "JMS Username",
        helpText = "The username to use for authentication on the JMS server.",
        example = "sampleusername")
    String getUsername();

    void setUsername(String username);

    @TemplateParameter.Password(
        order = 6,
        description = "JMS Password",
        helpText = "The password associated with the provided username.",
        example = "samplepassword")
    String getPassword();

    void setPassword(String password);
  }
}

Nächste Schritte