Modello JMS to Pub/Sub

Il modello JMS to Pub/Sub è una pipeline di flusso che legge i messaggi dal server JMS Active MQ (coda/argomento) e li scrive in Pub/Sub.

Requisiti della pipeline

  • Il nome dell'argomento di output Pub/Sub deve esistere.
  • L'IP dell'host JMS deve esistere e avere la configurazione di rete corretta per consentire alle VM worker Dataflow di raggiungere l'host JMS.
  • L'argomento/la coda JMS da cui vengono estratti i dati deve avere un nome.

Parametri del modello

Parametri obbligatori

  • inputName: il nome dell'argomento o della coda JMS da cui vengono letti i dati. Ad esempio, queue.
  • inputType: il tipo di destinazione JMS da cui leggere i dati. Può essere una coda o un argomento. Ad esempio, queue.
  • outputTopic: il nome dell'argomento Pub/Sub in cui pubblicare i dati. Ad esempio, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • username: il nome utente da utilizzare per l'autenticazione sul server JMS. Ad esempio, sampleusername.
  • password: la password associata al nome utente fornito. Ad esempio, samplepassword.

Parametri facoltativi

  • jmsServer: l'IP del server JMS (ActiveMQ). Ad esempio, tcp://

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona JMS to Pub/Sub template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/JMS_to_Cloud_PubSub \
    --parameters \

In questo esempio devi sostituire i seguenti valori:

  • Sostituisci YOUR_PROJECT_ID con l'ID progetto.
  • Sostituisci con il nome della regione Dataflow. Ad esempio: us-central1.
  • Sostituisci JOB_NAME con il nome di un job a tua scelta. Il nome del job deve corrispondere all'espressione regolare [a-z]([-a-z0-9]{0,38}[a-z0-9])? per essere valido.
  • Sostituisci JMS_SERVER con gli indirizzi del server JMS. Ad esempio: tcp://
  • Sostituisci INPUT_NAME con il nome dell'argomento/della coda di input del server JMS. Ad esempio: testtopic.
  • Sostituisci INPUT_TYPE con il tipo di destinazione del server JMS(coda/argomento). Ad esempio: topic
  • Sostituisci OUTPUT_TOPIC con il nome dell'argomento di output Pub/Sub. Ad esempio: projects/myproject/topics/testoutput.
  • Sostituisci USERNAME con il nome utente del server JMS. Ad esempio: testuser.
  • Sostituisci PASSWORD con la password corrispondente al nome utente utilizzato con il server JMS.

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "jmsServer": "JMS_SERVER",
          "inputName": "INPUT_NAME",
          "inputType": "INPUT_TYPE",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/JMS_to_Cloud_PubSub",

In questo esempio devi sostituire i seguenti valori:

  • Sostituisci YOUR_PROJECT_ID con l'ID progetto.
  • Sostituisci con il nome della regione Dataflow. Ad esempio: us-central1.
  • Sostituisci JOB_NAME con il nome di un job a tua scelta. Il nome del job deve corrispondere all'espressione regolare [a-z]([-a-z0-9]{0,38}[a-z0-9])? per essere valido.
  • Sostituisci JMS_SERVER con gli indirizzi del server JMS. Ad esempio: tcp://
  • Sostituisci INPUT_NAME con il nome dell'argomento/della coda di input del server JMS. Ad esempio: testtopic.
  • Sostituisci INPUT_TYPE con il tipo di destinazione del server JMS(coda/argomento). Ad esempio: topic
  • Sostituisci OUTPUT_TOPIC con il nome dell'argomento di output Pub/Sub. Ad esempio: projects/myproject/topics/testoutput.
  • Sostituisci USERNAME con il nome utente del server JMS. Ad esempio: testuser.
  • Sostituisci PASSWORD con la password corrispondente al nome utente utilizzato con il server JMS.
 * Copyright (C) 2023 Google LLC
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.

import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
 * <p>Check out <a
 * href="">README</a>
 * for instructions on how to use or modify this template.
    name = "Jms_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "JMS to Pubsub",
    description =
        "The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
    optionsClass =,
    flexContainerName = "jms-to-pubsub",
    contactInformation = "",
    documentation =
    preview = true,
    requirements = {
      "The Pub/Sub output topic name must exist.",
      "The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
      "The JMS topic/queue that data is extracted from must have a name."
    streaming = true,
    supportsAtLeastOnce = true)
public class JmsToPubsub {

   * Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
   * @param args arguments to the pipeline
  private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);

  public static void main(String[] args) {
    JmsToPubsubOptions options =

  public static void validate(JmsToPubsubOptions options) {
    if (options != null) {
      if ((options.getUsername() != null
              && (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
          && (options.getPassword() == null
              || options.getPassword().isBlank()
              || options.getPassword().isEmpty())) {
        throw new IllegalArgumentException(
            "While username is provided, password is required for authentication");

  public static PipelineResult run(JmsToPubsubOptions options) {
    Pipeline pipeline = Pipeline.create(options);
    String connectionURI = options.getJmsServer();
    ConnectionFactory myConnectionFactory;
    PCollection<JmsRecord> input;

    if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
      myConnectionFactory =
          new ActiveMQConnectionFactory(
              options.getUsername(), options.getPassword(), connectionURI);
    } else {
      myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
    }"Given Input Type " + options.getInputType());
    if (options.getInputType().equalsIgnoreCase("queue")) {
      input =
              "Read From JMS Queue",
    } else {
      input =
              "Read From JMS Topic",

                new SimpleFunction<JmsRecord, String>() {
                  public String apply(JmsRecord input) {
                    return input.getPayload();
        .apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));

   * The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
   * executor at the command-line.
  public interface JmsToPubsubOptions extends PipelineOptions {
        order = 1,
        groupName = "Source",
        optional = true,
        regexes = {"[,\\/:a-zA-Z0-9._-]+"},
        description = "JMS Host IP",
        helpText = "The JMS (ActiveMQ) Server IP.",
        example = "tcp://")
    String getJmsServer();

    void setJmsServer(String jmsServer);

        order = 2,
        groupName = "Source",
        optional = false,
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "JMS Queue/Topic Name to read the input from",
        helpText = "The name of the JMS topic or queue that data is read from.",
        example = "queue")
    String getInputName();

    void setInputName(String inputName);

        order = 3,
        optional = false,
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "JMS Destination Type to read the input from",
        helpText = "The JMS destination type to read data from. Can be a queue or a topic.",
        example = "queue")
    String getInputType();

    void setInputType(String inputType);

        order = 4,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The name of the Pub/Sub topic to publish data to.",
        example = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>")
    String getOutputTopic();

    void setOutputTopic(String outputTopic);

        order = 5,
        description = "JMS Username",
        helpText = "The username to use for authentication on the JMS server.",
        example = "sampleusername")
    String getUsername();

    void setUsername(String username);

        order = 6,
        description = "JMS Password",
        helpText = "The password associated with the provided username.",
        example = "samplepassword")
    String getPassword();

    void setPassword(String password);

Passaggi successivi