Modelo de JMS para Pub/Sub

O modelo de JMS para Pub/Sub é um pipeline de streaming que lê as mensagens do servidor JMS do ActiveMQ (fila/tópico) e as grava no Pub/Sub.

Requisitos do pipeline

  • O nome do tópico de saída do Pub/Sub precisa existir.
  • O IP de host do JMS precisa existir e ter a configuração de rede adequada a fim de que as VMs de worker do Dataflow acessem o host do JMS.
  • O tópico/fila do JMS do qual os dados são extraídos precisa ter um nome.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputName: o nome do tópico ou da fila do JMS em que os dados são lidos. Por exemplo, queue.
  • inputType: o tipo de destino do JMS em que os dados serão lidos. Pode ser uma fila ou um tópico. Por exemplo, queue.
  • outputTopic: o nome do tópico do Pub/Sub em que os dados serão publicados. Por exemplo, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • username: o nome de usuário a ser usado na autenticação no servidor JMS. Por exemplo, sampleusername.
  • senha: a senha associada ao nome de usuário fornecido. Por exemplo, samplepassword.

Parâmetros opcionais

  • jmsServer: o IP do servidor JMS (ActiveMQ). Por exemplo, tcp://10.0.0.1:61616.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione JMS to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/JMS_to_Cloud_PubSub \
    --parameters \
jmsServer=JMS_SERVER,\
inputName=INPUT_NAME,\
inputType=INPUT_TYPE,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua pelo nome da região do Dataflow. Por exemplo, us-central1.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua JMS_SERVER pelos endereços do servidor JMS. Por exemplo: tcp://10.0.0.0:61616
  • Substitua INPUT_NAME pelo nome do tópico/fila de entrada do servidor JMS. Por exemplo, testtopic.
  • Substitua INPUT_TYPE pelo tipo de destino do servidor JMS (fila/tópico). Por exemplo: topic
  • Substitua OUTPUT_TOPIC pelo nome do tópico de saída do Pub/Sub. Por exemplo, projects/myproject/topics/testoutput.
  • Substitua USERNAME pelo nome de usuário do servidor JMS. Por exemplo, testuser.
  • Substitua PASSWORD pela senha correspondente ao nome de usuário usado no servidor JMS.

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "jmsServer": "JMS_SERVER",
          "inputName": "INPUT_NAME",
          "inputType": "INPUT_TYPE",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/JMS_to_Cloud_PubSub",
   }
}
  

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua pelo nome da região do Dataflow. Por exemplo, us-central1.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua JMS_SERVER pelos endereços do servidor JMS. Por exemplo: tcp://10.0.0.0:61616
  • Substitua INPUT_NAME pelo nome do tópico/fila de entrada do servidor JMS. Por exemplo, testtopic.
  • Substitua INPUT_TYPE pelo tipo de destino do servidor JMS (fila/tópico). Por exemplo: topic
  • Substitua OUTPUT_TOPIC pelo nome do tópico de saída do Pub/Sub. Por exemplo, projects/myproject/topics/testoutput.
  • Substitua USERNAME pelo nome de usuário do servidor JMS. Por exemplo, testuser.
  • Substitua PASSWORD pela senha correspondente ao nome de usuário usado no servidor JMS.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.JmsRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow template which reads data from Jms Queue/Topic and writes it to Cloud PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jms-to-pubsub/README_Jms_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jms_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "JMS to Pubsub",
    description =
        "The JMS to Pub/Sub template is a streaming pipeline that reads messages from ActiveMQ JMS Server (Queue/Topic) and writes them to Pub/Sub.",
    optionsClass = com.google.cloud.teleport.v2.templates.JmsToPubsub.JmsToPubsubOptions.class,
    flexContainerName = "jms-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jms-to-pubsub",
    preview = true,
    requirements = {
      "The Pub/Sub output topic name must exist.",
      "The JMS host IP must exist and have the proper network configuration for Dataflow worker VMs to reach the JMS host.",
      "The JMS topic/queue that data is extracted from must have a name."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class JmsToPubsub {

  /**
   * Runs a pipeline which reads data from JMS queue/topic and writes it to Cloud PubSub.
   *
   * @param args arguments to the pipeline
   */
  private static final Logger LOG = LoggerFactory.getLogger(JmsToPubsub.class);

  public static void main(String[] args) {
    JmsToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JmsToPubsubOptions.class);
    run(options);
  }

  public static void validate(JmsToPubsubOptions options) {
    if (options != null) {
      if ((options.getUsername() != null
              && (!options.getUsername().isEmpty() || !options.getUsername().isBlank()))
          && (options.getPassword() == null
              || options.getPassword().isBlank()
              || options.getPassword().isEmpty())) {
        throw new IllegalArgumentException(
            "While username is provided, password is required for authentication");
      }
    }
  }

  public static PipelineResult run(JmsToPubsubOptions options) {
    validate(options);
    Pipeline pipeline = Pipeline.create(options);
    String connectionURI = options.getJmsServer();
    ConnectionFactory myConnectionFactory;
    PCollection<JmsRecord> input;

    if (!options.getUsername().isEmpty() || !options.getUsername().isBlank()) {
      myConnectionFactory =
          new ActiveMQConnectionFactory(
              options.getUsername(), options.getPassword(), connectionURI);
    } else {
      myConnectionFactory = new ActiveMQConnectionFactory(connectionURI);
    }
    LOG.info("Given Input Type " + options.getInputType());
    if (options.getInputType().equalsIgnoreCase("queue")) {
      input =
          pipeline.apply(
              "Read From JMS Queue",
              JmsIO.read()
                  .withConnectionFactory(myConnectionFactory)
                  .withQueue(options.getInputName()));
    } else {
      input =
          pipeline.apply(
              "Read From JMS Topic",
              JmsIO.read()
                  .withConnectionFactory(myConnectionFactory)
                  .withTopic(options.getInputName()));
    }

    input
        .apply(
            MapElements.via(
                new SimpleFunction<JmsRecord, String>() {
                  public String apply(JmsRecord input) {
                    return input.getPayload();
                  }
                }))
        .apply("WriteToPubSubTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
    return pipeline.run();
  }

  /**
   * The {@link JmsToPubsubOptions} interface provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface JmsToPubsubOptions extends PipelineOptions {
    @TemplateParameter.Text(
        order = 1,
        groupName = "Source",
        optional = true,
        regexes = {"[,\\/:a-zA-Z0-9._-]+"},
        description = "JMS Host IP",
        helpText = "The JMS (ActiveMQ) Server IP.",
        example = "tcp://10.0.0.1:61616")
    @Validation.Required
    String getJmsServer();

    void setJmsServer(String jmsServer);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        optional = false,
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "JMS Queue/Topic Name to read the input from",
        helpText = "The name of the JMS topic or queue that data is read from.",
        example = "queue")
    @Validation.Required
    String getInputName();

    void setInputName(String inputName);

    @TemplateParameter.Text(
        order = 3,
        optional = false,
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "JMS Destination Type to read the input from",
        helpText = "The JMS destination type to read data from. Can be a queue or a topic.",
        example = "queue")
    @Validation.Required
    String getInputType();

    void setInputType(String inputType);

    @TemplateParameter.PubsubTopic(
        order = 4,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The name of the Pub/Sub topic to publish data to.",
        example = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>")
    @Validation.Required
    String getOutputTopic();

    void setOutputTopic(String outputTopic);

    @TemplateParameter.Text(
        order = 5,
        description = "JMS Username",
        helpText = "The username to use for authentication on the JMS server.",
        example = "sampleusername")
    String getUsername();

    void setUsername(String username);

    @TemplateParameter.Password(
        order = 6,
        description = "JMS Password",
        helpText = "The password associated with the provided username.",
        example = "samplepassword")
    String getPassword();

    void setPassword(String password);
  }
}

A seguir