Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
La plantilla de JMS a Pub/Sub es una canalización de transmisión que lee mensajes del servidor JMS (cola/tema) de ActiveMQ y los escribe en Pub/Sub.
Requisitos de la canalización
El nombre del tema de salida de Pub/Sub debe existir.
La IP de host de JMS debe existir y tener la configuración de red adecuada para que las VMs de trabajador de Dataflow lleguen al host de JMS.
El tema o la cola de JMS de la que se extraen los datos debe tener un nombre.
Parámetros de la plantilla
Parámetros obligatorios
inputName: Nombre de tema o cola de JMS para leer la entrada. (Ejemplo: queue).
inputType: Tipo de destino de JMS para leer la entrada. (Ejemplo: queue).
outputTopic: El nombre del tema en el que se deben publicar los datos, en el formato “projects/your-project-id/topics/your-topic-name” (ejemplo: projects/your-project- id/topics/your-topic-name).
username: Nombre de usuario de JMS para la autenticación con el servidor JMS (ejemplo: sampleusername).
password: Contraseña para el nombre de usuario proporcionado para la autenticación con el servidor JMS (ejemplo: samplepassword).
Parámetros opcionales
jmsServer: IP del servidor para el host JMS (ejemplo: host:5672).
Ejecuta la plantilla
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza por el nombre de la región de Dataflow. Por ejemplo: us-central1.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza JMS_SERVER por las direcciones del servidor JMS. Por ejemplo: tcp://10.0.0.0:61616.
Reemplaza INPUT_NAME por el nombre de tema o cola de entrada del servidor JMS. Por ejemplo: testtopic.
Reemplaza INPUT_TYPE por el tipo de destino del servidor JMS (cola/tema). Por ejemplo: topic.
Reemplaza OUTPUT_TOPIC por el nombre del tema de salida de Pub/Sub. Por ejemplo: projects/myproject/topics/testoutput.
Reemplaza USERNAME por el nombre de usuario para el servidor JMS. Por ejemplo: testuser.
Reemplaza PASSWORD por la contraseña que corresponde al nombre de usuario usado con el servidor JMS.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus permisos de autorización, consulta projects.templates.launch.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza por el nombre de la región de Dataflow. Por ejemplo: us-central1.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza JMS_SERVER por las direcciones del servidor JMS. Por ejemplo: tcp://10.0.0.0:61616.
Reemplaza INPUT_NAME por el nombre de tema o cola de entrada del servidor JMS. Por ejemplo: testtopic.
Reemplaza INPUT_TYPE por el tipo de destino del servidor JMS (cola/tema). Por ejemplo: topic.
Reemplaza OUTPUT_TOPIC por el nombre del tema de salida de Pub/Sub. Por ejemplo: projects/myproject/topics/testoutput.
Reemplaza USERNAME por el nombre de usuario para el servidor JMS. Por ejemplo: testuser.
Reemplaza PASSWORD por la contraseña que corresponde al nombre de usuario usado con el servidor JMS.
Código fuente de la plantilla
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jms-to-pubsub/README_Jms_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Jms_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "JMS to Pubsub",
description =
"The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
optionsClass = com.google.cloud.teleport.v2.templates.JmsToPubsub.JmsToPubsubOptions.class,
flexContainerName = "jms-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/jms-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
"The JMS topic/queue that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class JmsToPubsub {
/**
* Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);
public static void main(String[] args) {
JmsToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(JmsToPubsubOptions.class);
run(options);
}
public static void validate(JmsToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null
&& (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
&& (options.getPassword() == null
|| options.getPassword().isBlank()
|| options.getPassword().isEmpty())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(JmsToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
String connectionURI = options.getJmsServer();
ConnectionFactory myConnectionFactory;
PCollection<JmsRecord> input;
if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
myConnectionFactory =
new ActiveMQConnectionFactory(
options.getUsername(), options.getPassword(), connectionURI);
} else {
myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
}
LOG.info("Given Input Type " + options.getInputType());
if (options.getInputType().equalsIgnoreCase("queue")) {
input =
pipeline.apply(
"Read From JMS Queue",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withQueue(options.getInputName()));
} else {
input =
pipeline.apply(
"Read From JMS Topic",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withTopic(options.getInputName()));
}
input
.apply(
MapElements.via(
new SimpleFunction<JmsRecord, String>() {
public String apply(JmsRecord input) {
return input.getPayload();
}
}))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}
/**
* The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface JmsToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "JMS Host IP",
helpText = "The JMS (ActiveMQ) Server IP.",
example = "tcp://10.0.0.1:61616")
@Validation.Required
String getJmsServer();
void setJmsServer(String jmsServer);
@TemplateParameter.Text(
order = 2,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Queue/Topic Name to read the input from",
helpText = "The name of the JMS topic or queue that data is read from.",
example = "queue")
@Validation.Required
String getInputName();
void setInputName(String inputName);
@TemplateParameter.Text(
order = 3,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Destination Type to read the input from",
helpText = "The JMS destination type to read data from. Can be a queue or a topic.",
example = "queue")
@Validation.Required
String getInputType();
void setInputType(String inputType);
@TemplateParameter.Text(
order = 4,
description = "Output Pub/Sub topic",
helpText =
"The name of the Pub/Sub topic to publish data to, in the format `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 5,
description = "JMS Username",
helpText = "The username to use for authentication on the JMS server.",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Text(
order = 6,
description = "JMS Password",
helpText = "The password associated with the provided username.",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}