Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
Il modello da JMS a Pub/Sub è una pipeline di inserimento flussi che legge i messaggi dal server JMS MQ attivo (coda/argomento) e li scrive in Pub/Sub.
Requisiti della pipeline
Deve esistere il nome dell'argomento di output Pub/Sub.
L'IP dell'host JMS deve esistere e avere la configurazione di rete adeguata per consentire alle VM worker Dataflow di raggiungere l'host JMS.
L'argomento/la coda JMS da cui vengono estratti i dati deve avere un nome.
Parametri del modello
Parametri obbligatori
inputName : il nome dell'argomento o della coda JMS da cui vengono letti i dati. (ad esempio, coda).
inputType : il tipo di destinazione JMS da cui leggere i dati. Può essere una coda o un argomento. (ad esempio, coda).
outputTopic : il nome dell'argomento Pub/Sub in cui pubblicare i dati, nel formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. ad esempio projects/your-project-id/topics/your-topic-name.
nome utente : il nome utente da utilizzare per l'autenticazione sul server JMS. ad esempio nomeutente.
password : la password associata al nome utente fornito. ad esempio password-esempio.
Parametri facoltativi
jmsServer : l'IP del server JMS (ActiveMQ). (Esempio: tcp://10.0.0.1:61616).
In questo esempio devi sostituire i valori seguenti:
Sostituisci YOUR_PROJECT_ID con l'ID progetto.
Sostituisci con il nome della regione Dataflow. Ad esempio: us-central1.
Sostituisci JOB_NAME con un nome job a tua scelta. Per essere valido, il nome del job deve corrispondere all'espressione regolare
[a-z]([-a-z0-9]{0,38}[a-z0-9])?.
Sostituisci JMS_SERVER con gli indirizzi del server JMS. Ad esempio: tcp://10.0.0.0:61616
Sostituisci INPUT_NAME con il nome dell'argomento/della coda di input del server JMS. Ad esempio: testtopic.
Sostituisci INPUT_TYPE con il tipo di destinazione del server JMS(coda/argomento). Ad esempio: topic
Sostituisci OUTPUT_TOPIC con il nome dell'argomento di output Pub/Sub. Ad esempio: projects/myproject/topics/testoutput.
Sostituisci USERNAME con il nome utente del server JMS. Ad esempio: testuser.
Sostituisci PASSWORD con la password che corrisponde al nome utente utilizzato con il server JMS.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.
In questo esempio devi sostituire i valori seguenti:
Sostituisci YOUR_PROJECT_ID con l'ID progetto.
Sostituisci con il nome della regione Dataflow. Ad esempio: us-central1.
Sostituisci JOB_NAME con un nome job a tua scelta. Per essere valido, il nome del job deve corrispondere all'espressione regolare
[a-z]([-a-z0-9]{0,38}[a-z0-9])?.
Sostituisci JMS_SERVER con gli indirizzi del server JMS. Ad esempio: tcp://10.0.0.0:61616
Sostituisci INPUT_NAME con il nome dell'argomento/della coda di input del server JMS. Ad esempio: testtopic.
Sostituisci INPUT_TYPE con il tipo di destinazione del server JMS(coda/argomento). Ad esempio: topic
Sostituisci OUTPUT_TOPIC con il nome dell'argomento di output Pub/Sub. Ad esempio: projects/myproject/topics/testoutput.
Sostituisci USERNAME con il nome utente del server JMS. Ad esempio: testuser.
Sostituisci PASSWORD con la password che corrisponde al nome utente utilizzato con il server JMS.
Codice sorgente del modello
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jms-to-pubsub/README_Jms_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Jms_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "JMS to Pubsub",
description =
"The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
optionsClass = com.google.cloud.teleport.v2.templates.JmsToPubsub.JmsToPubsubOptions.class,
flexContainerName = "jms-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/jms-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
"The JMS topic/queue that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class JmsToPubsub {
/**
* Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);
public static void main(String[] args) {
JmsToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(JmsToPubsubOptions.class);
run(options);
}
public static void validate(JmsToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null
&& (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
&& (options.getPassword() == null
|| options.getPassword().isBlank()
|| options.getPassword().isEmpty())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(JmsToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
String connectionURI = options.getJmsServer();
ConnectionFactory myConnectionFactory;
PCollection<JmsRecord> input;
if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
myConnectionFactory =
new ActiveMQConnectionFactory(
options.getUsername(), options.getPassword(), connectionURI);
} else {
myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
}
LOG.info("Given Input Type " + options.getInputType());
if (options.getInputType().equalsIgnoreCase("queue")) {
input =
pipeline.apply(
"Read From JMS Queue",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withQueue(options.getInputName()));
} else {
input =
pipeline.apply(
"Read From JMS Topic",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withTopic(options.getInputName()));
}
input
.apply(
MapElements.via(
new SimpleFunction<JmsRecord, String>() {
public String apply(JmsRecord input) {
return input.getPayload();
}
}))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}
/**
* The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface JmsToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "JMS Host IP",
helpText = "The JMS (ActiveMQ) Server IP.",
example = "tcp://10.0.0.1:61616")
@Validation.Required
String getJmsServer();
void setJmsServer(String jmsServer);
@TemplateParameter.Text(
order = 2,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Queue/Topic Name to read the input from",
helpText = "The name of the JMS topic or queue that data is read from.",
example = "queue")
@Validation.Required
String getInputName();
void setInputName(String inputName);
@TemplateParameter.Text(
order = 3,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Destination Type to read the input from",
helpText = "The JMS destination type to read data from. Can be a queue or a topic.",
example = "queue")
@Validation.Required
String getInputType();
void setInputType(String inputType);
@TemplateParameter.PubsubTopic(
order = 4,
description = "Output Pub/Sub topic",
helpText =
"The name of the Pub/Sub topic to publish data to, in the format `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 5,
description = "JMS Username",
helpText = "The username to use for authentication on the JMS server.",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Password(
order = 6,
description = "JMS Password",
helpText = "The password associated with the provided username.",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}