Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Die Vorlage "JMS für Pub/Sub" ist eine Streamingpipeline, die Nachrichten vom Active MQ JMS Server (Warteschlange/Thema) liest und in Pub/Sub schreibt.
Pipelineanforderungen
Der Name des Pub/Sub-Ausgabethemas muss vorhanden sein.
Die JMS-Host-IP-Adresse muss vorhanden sein und die erforderliche Netzwerkkonfiguration haben, damit Dataflow-Worker-VMs den JMS-Host erreichen können.
Das JMS-Thema/die JMS-Warteschlange, aus dem/der Daten extrahiert werden, muss einen Namen haben.
Vorlagenparameter
Erforderliche Parameter
inputName : JMS-Warteschlangen-/Themenname, aus der die Eingabe gelesen werden soll. (Beispiel: Warteschlange).
inputType ist der JMS-Zieltyp, aus dem die Eingabe gelesen werden soll. (Beispiel: Warteschlange).
outputTopic ist der Name des Themas, zu dem Daten veröffentlicht werden sollen, im Format "projects/your-project-id/topics/your-topic-name" (Beispiel: projects/your-project- id/topics/your-topic-name).
username ist der JMS-Nutzername für die Authentifizierung mit dem JMS-Server (Beispiel: Beispielnutzername).
password ist das Passwort für den Nutzernamen, der für die Authentifizierung mit dem JMS-Server bereitgestellt wird (Beispiel: samplepassword).
Optionale Parameter
jmsServer ist die Server-IP für den JMS-Host (Beispiel: host:5672).
Führen Sie die Vorlage aus.
Console
Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
In diesem Beispiel müssen Sie die folgenden Werte ersetzen:
Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
Ersetzen Sie durch den Namen der Dataflow-Region. Beispiel: us-central1
Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl. Der Jobname ist nur gültig, wenn er dem regulären Ausdruck [a-z]([-a-z0-9]{0,38}[a-z0-9])? entspricht.
Ersetzen Sie JMS_SERVER durch die JMS-Serveradressen. Beispiel: tcp://10.0.0.0:61616
Ersetzen Sie INPUT_NAME durch den Namen des JMS-Servereingabethemas/der JMS-Eingabewarteschlange. Beispiel: testtopic
Ersetzen Sie INPUT_TYPE durch den Zieltyp des JMS-Servers (Warteschlange/Thema). Beispiel: topic
Ersetzen Sie OUTPUT_TOPIC durch den Namen des Pub/Sub-Ausgabe-Themas. Beispiel: projects/myproject/topics/testoutput
Ersetzen Sie USERNAME durch den Nutzernamen für den JMS-Server. Beispiel: testuser
Ersetzen Sie PASSWORD durch das Passwort für den Nutzernamen, der mit dem JMS-Server verwendet wird.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.
In diesem Beispiel müssen Sie die folgenden Werte ersetzen:
Ersetzen Sie YOUR_PROJECT_ID durch Ihre Projekt-ID.
Ersetzen Sie durch den Namen der Dataflow-Region. Beispiel: us-central1
Ersetzen Sie JOB_NAME durch einen Jobnamen Ihrer Wahl. Der Jobname ist nur gültig, wenn er dem regulären Ausdruck [a-z]([-a-z0-9]{0,38}[a-z0-9])? entspricht.
Ersetzen Sie JMS_SERVER durch die JMS-Serveradressen. Beispiel: tcp://10.0.0.0:61616
Ersetzen Sie INPUT_NAME durch den Namen des JMS-Servereingabethemas/der JMS-Eingabewarteschlange. Beispiel: testtopic
Ersetzen Sie INPUT_TYPE durch den Zieltyp des JMS-Servers (Warteschlange/Thema). Beispiel: topic
Ersetzen Sie OUTPUT_TOPIC durch den Namen des Pub/Sub-Ausgabe-Themas. Beispiel: projects/myproject/topics/testoutput
Ersetzen Sie USERNAME durch den Nutzernamen für den JMS-Server. Beispiel: testuser
Ersetzen Sie PASSWORD durch das Passwort für den Nutzernamen, der mit dem JMS-Server verwendet wird.
Quellcode der Vorlage
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jms-to-pubsub/README_Jms_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Jms_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "JMS to Pubsub",
description =
"The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
optionsClass = com.google.cloud.teleport.v2.templates.JmsToPubsub.JmsToPubsubOptions.class,
flexContainerName = "jms-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/jms-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
"The JMS topic/queue that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class JmsToPubsub {
/**
* Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);
public static void main(String[] args) {
JmsToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(JmsToPubsubOptions.class);
run(options);
}
public static void validate(JmsToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null
&& (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
&& (options.getPassword() == null
|| options.getPassword().isBlank()
|| options.getPassword().isEmpty())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(JmsToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
String connectionURI = options.getJmsServer();
ConnectionFactory myConnectionFactory;
PCollection<JmsRecord> input;
if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
myConnectionFactory =
new ActiveMQConnectionFactory(
options.getUsername(), options.getPassword(), connectionURI);
} else {
myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
}
LOG.info("Given Input Type " + options.getInputType());
if (options.getInputType().equalsIgnoreCase("queue")) {
input =
pipeline.apply(
"Read From JMS Queue",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withQueue(options.getInputName()));
} else {
input =
pipeline.apply(
"Read From JMS Topic",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withTopic(options.getInputName()));
}
input
.apply(
MapElements.via(
new SimpleFunction<JmsRecord, String>() {
public String apply(JmsRecord input) {
return input.getPayload();
}
}))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}
/**
* The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface JmsToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "JMS Host IP",
helpText = "Server IP for JMS Host",
example = "host:5672")
@Validation.Required
String getJmsServer();
void setJmsServer(String jmsServer);
@TemplateParameter.Text(
order = 2,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Queue/Topic Name to read the input from",
helpText = "JMS Queue/Topic Name to read the input from.",
example = "queue")
@Validation.Required
String getInputName();
void setInputName(String inputName);
@TemplateParameter.Text(
order = 3,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Destination Type to read the input from",
helpText = "JMS Destination Type to read the input from.",
example = "queue")
@Validation.Required
String getInputType();
void setInputType(String inputType);
@TemplateParameter.Text(
order = 4,
description = "Output Pub/Sub topic",
helpText =
"The name of the topic to which data should published, in the format of"
+ " 'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 5,
description = "JMS Username",
helpText = "JMS username for authentication with JMS server",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Text(
order = 6,
description = "JMS Password",
helpText = "Password for username provided for authentication with JMS server",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],["Zuletzt aktualisiert: 2024-05-11 (UTC)."],[],[]]