Mantenha tudo organizado com as coleções
Salve e categorize o conteúdo com base nas suas preferências.
O modelo de JMS para Pub/Sub é um pipeline de streaming que lê as mensagens do servidor JMS do ActiveMQ (fila/tópico) e as grava no Pub/Sub.
Requisitos do pipeline
O nome do tópico de saída do Pub/Sub precisa existir.
O IP de host do JMS precisa existir e ter a configuração de rede adequada a fim de que as VMs de worker do Dataflow acessem o host do JMS.
O tópico/fila do JMS do qual os dados são extraídos precisa ter um nome.
Parâmetros do modelo
Parâmetros obrigatórios
inputName : o nome do tópico ou da fila do JMS em que os dados são lidos. (Exemplo: queue).
inputType : o tipo de destino do JMS em que os dados serão lidos. Pode ser uma fila ou um tópico. (Exemplo: queue).
outputTopic : o nome do tópico do Pub/Sub em que os dados serão publicados, no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. (Exemplo: projects/your-project-id/topics/your-topic-name).
username : o nome de usuário a ser usado na autenticação no servidor JMS. (Exemplo: sampleusername).
password : a senha associada ao nome de usuário fornecido. (Exemplo: samplepassword).
Parâmetros opcionais
jmsServer : o IP do servidor JMS (ActiveMQ). (Exemplo: tcp://10.0.0.1:61616).
Executar o modelo
Console
Acesse a página Criar job usando um modelo do Dataflow.
Substitua pelo nome da região do Dataflow. Por exemplo, us-central1.
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua JMS_SERVER pelos endereços do servidor JMS. Por exemplo: tcp://10.0.0.0:61616
Substitua INPUT_NAME pelo nome do tópico/fila de entrada do servidor JMS. Por exemplo, testtopic.
Substitua INPUT_TYPE pelo tipo de destino do servidor JMS (fila/tópico). Por exemplo: topic
Substitua OUTPUT_TOPIC pelo nome do tópico de saída do Pub/Sub. Por exemplo, projects/myproject/topics/testoutput.
Substitua USERNAME pelo nome de usuário do servidor JMS. Por exemplo, testuser.
Substitua PASSWORD pela senha correspondente ao nome de usuário usado no servidor JMS.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch.
Substitua pelo nome da região do Dataflow. Por exemplo, us-central1.
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua JMS_SERVER pelos endereços do servidor JMS. Por exemplo: tcp://10.0.0.0:61616
Substitua INPUT_NAME pelo nome do tópico/fila de entrada do servidor JMS. Por exemplo, testtopic.
Substitua INPUT_TYPE pelo tipo de destino do servidor JMS (fila/tópico). Por exemplo: topic
Substitua OUTPUT_TOPIC pelo nome do tópico de saída do Pub/Sub. Por exemplo, projects/myproject/topics/testoutput.
Substitua USERNAME pelo nome de usuário do servidor JMS. Por exemplo, testuser.
Substitua PASSWORD pela senha correspondente ao nome de usuário usado no servidor JMS.
Código-fonte do modelo
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jms-to-pubsub/README_Jms_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Jms_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "JMS to Pubsub",
description =
"The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
optionsClass = com.google.cloud.teleport.v2.templates.JmsToPubsub.JmsToPubsubOptions.class,
flexContainerName = "jms-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/jms-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
"The JMS topic/queue that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class JmsToPubsub {
/**
* Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);
public static void main(String[] args) {
JmsToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(JmsToPubsubOptions.class);
run(options);
}
public static void validate(JmsToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null
&& (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
&& (options.getPassword() == null
|| options.getPassword().isBlank()
|| options.getPassword().isEmpty())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(JmsToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
String connectionURI = options.getJmsServer();
ConnectionFactory myConnectionFactory;
PCollection<JmsRecord> input;
if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
myConnectionFactory =
new ActiveMQConnectionFactory(
options.getUsername(), options.getPassword(), connectionURI);
} else {
myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
}
LOG.info("Given Input Type " + options.getInputType());
if (options.getInputType().equalsIgnoreCase("queue")) {
input =
pipeline.apply(
"Read From JMS Queue",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withQueue(options.getInputName()));
} else {
input =
pipeline.apply(
"Read From JMS Topic",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withTopic(options.getInputName()));
}
input
.apply(
MapElements.via(
new SimpleFunction<JmsRecord, String>() {
public String apply(JmsRecord input) {
return input.getPayload();
}
}))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}
/**
* The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface JmsToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
groupName = "Source",
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "JMS Host IP",
helpText = "The JMS (ActiveMQ) Server IP.",
example = "tcp://10.0.0.1:61616")
@Validation.Required
String getJmsServer();
void setJmsServer(String jmsServer);
@TemplateParameter.Text(
order = 2,
groupName = "Source",
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Queue/Topic Name to read the input from",
helpText = "The name of the JMS topic or queue that data is read from.",
example = "queue")
@Validation.Required
String getInputName();
void setInputName(String inputName);
@TemplateParameter.Text(
order = 3,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Destination Type to read the input from",
helpText = "The JMS destination type to read data from. Can be a queue or a topic.",
example = "queue")
@Validation.Required
String getInputType();
void setInputType(String inputType);
@TemplateParameter.PubsubTopic(
order = 4,
groupName = "Target",
description = "Output Pub/Sub topic",
helpText =
"The name of the Pub/Sub topic to publish data to, in the format `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 5,
description = "JMS Username",
helpText = "The username to use for authentication on the JMS server.",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Password(
order = 6,
description = "JMS Password",
helpText = "The password associated with the provided username.",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}
[[["Fácil de entender","easyToUnderstand","thumb-up"],["Meu problema foi resolvido","solvedMyProblem","thumb-up"],["Outro","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Problema na tradução","translationIssue","thumb-down"],["Outro","otherDown","thumb-down"]],["Última atualização 2024-08-09 UTC."],[],[]]