Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Le modèle JMS vers Pub/Sub est un pipeline de traitement par flux qui lit les messages du serveur JMS ActiveMQ (file d'attente/sujet) et les écrit dans Pub/Sub.
Conditions requises pour ce pipeline
Le nom du sujet de sortie Pub/Sub doit déjà exister.
L'adresse IP de l'hôte JMS doit exister et présenter la configuration réseau appropriée pour que les VM de nœud de calcul Dataflow puissent atteindre l'hôte JMS.
Le sujet/file d'attente JMS à partir duquel les données sont extraites doit avoir un nom.
Paramètres de modèle
Paramètres obligatoires
inputName : nom du sujet ou de la file d'attente JMS à partir duquel les données sont lues. (Exemple : queue).
inputType : type de destination JMS à partir duquel lire les données. Il peut s'agir d'une file d'attente ou d'un sujet. (Exemple : queue).
outputTopic : nom du sujet Pub/Sub dans lequel publier les données, au format projects/<PROJECT_ID>/topics/<TOPIC_NAME>. (par exemple, projects/votre-id-projet/topics/nom-de-votre-sujet).
username : Nom d'utilisateur à utiliser pour l'authentification sur le serveur JMS. (Exemple: exemplenomdutilisateur).
password : Mot de passe associé au nom d'utilisateur fourni. (Exemple: exemplemotdepasse).
Paramètres facultatifs
jmsServer : adresse IP du serveur JMS (ActiveMQ). (par exemple, tcp://10.0.0.1:61616).
Exécuter le modèle
Console
Accédez à la page Dataflow Créer un job à partir d'un modèle.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez par le nom de la région Dataflow. Exemple : us-central1.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez JMS_SERVER par les adresses du serveur JMS. Par exemple : tcp://10.0.0.0:61616
Remplacez INPUT_NAME par le nom du sujet ou de la file d'attente d'entrée du serveur JMS. Exemple : testtopic.
Remplacez INPUT_TYPE par le type de destination du serveur JMS (file d'attente/sujet). Par exemple : topic
Remplacez OUTPUT_TOPIC par le nom du sujet de sortie Pub/Sub. Exemple : projects/myproject/topics/testoutput.
Remplacez USERNAME par le nom d'utilisateur du serveur JMS. Exemple : testuser.
Remplacez PASSWORD par le mot de passe correspondant au nom d'utilisateur associé au serveur JMS.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API et ses champs d'application d'autorisation, consultez la section projects.templates.launch.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez par le nom de la région Dataflow. Exemple : us-central1.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez JMS_SERVER par les adresses du serveur JMS. Par exemple : tcp://10.0.0.0:61616
Remplacez INPUT_NAME par le nom du sujet ou de la file d'attente d'entrée du serveur JMS. Exemple : testtopic.
Remplacez INPUT_TYPE par le type de destination du serveur JMS (file d'attente/sujet). Par exemple : topic
Remplacez OUTPUT_TOPIC par le nom du sujet de sortie Pub/Sub. Exemple : projects/myproject/topics/testoutput.
Remplacez USERNAME par le nom d'utilisateur du serveur JMS. Exemple : testuser.
Remplacez PASSWORD par le mot de passe correspondant au nom d'utilisateur associé au serveur JMS.
Code source du modèle
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jms-to-pubsub/README_Jms_to_PubSub.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Jms_to_PubSub",
category = TemplateCategory.STREAMING,
displayName = "JMS to Pubsub",
description =
"The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
optionsClass = com.google.cloud.teleport.v2.templates.JmsToPubsub.JmsToPubsubOptions.class,
flexContainerName = "jms-to-pubsub",
contactInformation = "https://cloud.google.com/support",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/jms-to-pubsub",
preview = true,
requirements = {
"The Pub/Sub output topic name must exist.",
"The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
"The JMS topic/queue that data is extracted from must have a name."
},
streaming = true,
supportsAtLeastOnce = true)
public class JmsToPubsub {
/**
* Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
*
* @param args arguments to the pipeline
*/
private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);
public static void main(String[] args) {
JmsToPubsubOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(JmsToPubsubOptions.class);
run(options);
}
public static void validate(JmsToPubsubOptions options) {
if (options != null) {
if ((options.getUsername() != null
&& (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
&& (options.getPassword() == null
|| options.getPassword().isBlank()
|| options.getPassword().isEmpty())) {
throw new IllegalArgumentException(
"While username is provided, password is required for authentication");
}
}
}
public static PipelineResult run(JmsToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
String connectionURI = options.getJmsServer();
ConnectionFactory myConnectionFactory;
PCollection<JmsRecord> input;
if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
myConnectionFactory =
new ActiveMQConnectionFactory(
options.getUsername(), options.getPassword(), connectionURI);
} else {
myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
}
LOG.info("Given Input Type " + options.getInputType());
if (options.getInputType().equalsIgnoreCase("queue")) {
input =
pipeline.apply(
"Read From JMS Queue",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withQueue(options.getInputName()));
} else {
input =
pipeline.apply(
"Read From JMS Topic",
JmsIO.read()
.withConnectionFactory(myConnectionFactory)
.withTopic(options.getInputName()));
}
input
.apply(
MapElements.via(
new SimpleFunction<JmsRecord, String>() {
public String apply(JmsRecord input) {
return input.getPayload();
}
}))
.apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}
/**
* The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
* executor at the command-line.
*/
public interface JmsToPubsubOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
optional = true,
regexes = {"[,\\/:a-zA-Z0-9._-]+"},
description = "JMS Host IP",
helpText = "The JMS (ActiveMQ) Server IP.",
example = "tcp://10.0.0.1:61616")
@Validation.Required
String getJmsServer();
void setJmsServer(String jmsServer);
@TemplateParameter.Text(
order = 2,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Queue/Topic Name to read the input from",
helpText = "The name of the JMS topic or queue that data is read from.",
example = "queue")
@Validation.Required
String getInputName();
void setInputName(String inputName);
@TemplateParameter.Text(
order = 3,
optional = false,
regexes = {"[a-zA-Z0-9._-]+"},
description = "JMS Destination Type to read the input from",
helpText = "The JMS destination type to read data from. Can be a queue or a topic.",
example = "queue")
@Validation.Required
String getInputType();
void setInputType(String inputType);
@TemplateParameter.PubsubTopic(
order = 4,
description = "Output Pub/Sub topic",
helpText =
"The name of the Pub/Sub topic to publish data to, in the format `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.",
example = "projects/your-project-id/topics/your-topic-name")
@Validation.Required
String getOutputTopic();
void setOutputTopic(String outputTopic);
@TemplateParameter.Text(
order = 5,
description = "JMS Username",
helpText = "The username to use for authentication on the JMS server.",
example = "sampleusername")
String getUsername();
void setUsername(String username);
@TemplateParameter.Password(
order = 6,
description = "JMS Password",
helpText = "The password associated with the provided username.",
example = "samplepassword")
String getPassword();
void setPassword(String password);
}
}
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
Dernière mise à jour le 2024/06/26 (UTC).
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"Problème de traduction"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Autre"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Facile à comprendre"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"J'ai pu résoudre mon problème"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Autre"
}]