Modelo do Java Database Connectivity (JDBC) para Pub/Sub

O modelo do Java Database Connectivity (JDBC) para Pub/Sub é um pipeline de streaming que ingere dados da origem JDBC e grava os registros resultantes em um tópico preexistente do Pub/Sub como uma string JSON.

Requisitos de pipeline

  • A origem do JDBC precisa ter sido criada antes da execução do pipeline.
  • O tópico de saída do Pub/Sub precisa existir antes de o pipeline ser executado.

Parâmetros do modelo

Parâmetros obrigatórios

  • driverClassName: o nome da classe do driver do JDBC. Por exemplo, com.mysql.jdbc.Driver.
  • connectionUrl: a string do URL de conexão do JDBC. É possível transmitir esse valor como uma string criptografada com uma chave do Cloud KMS e, em seguida, codificada em Base64. Por exemplo: 'echo -n "jdbc:mysql://some-host:3306/sampledb" | gcloud kms encrypt --location=
  • driverJars: caminhos do Cloud Storage separados por vírgulas para drivers JDBC. Por exemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
  • query: a consulta a ser executada na origem para extrair os dados. Por exemplo, select * from sampledb.sample_table.
  • outputTopic: o tópico do Pub/Sub em que a publicação será feita. Por exemplo, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.

Parâmetros opcionais

  • username: o nome de usuário a ser usado para a conexão JDBC. É possível transmitir esse valor como uma string criptografada com uma chave do Cloud KMS e, em seguida, codificada em Base64. Por exemplo, echo -n 'some_username' | glcloud kms encrypt --location=my_location --keyring=mykeyring --key=mykey --plaintext-file=- --ciphertext-file=- | base64.
  • password: a senha a ser usada para a conexão JDBC. É possível transmitir esse valor como uma string criptografada com uma chave do Cloud KMS e, em seguida, codificada em Base64. Por exemplo, echo -n 'some_password' | glcloud kms encrypt --location=my_location --keyring=mykeyring --key=mykey --plaintext-file=- --ciphertext-file=- | base64.
  • connectionProperties: a string de propriedades a ser usada para a conexão JDBC. O formato da string precisa ser [propertyName=property;]*. Por exemplo, unicode=true;characterEncoding=UTF-8.
  • KMSEncryptionKey: a chave de criptografia do Cloud KMS a ser usada para descriptografar o nome de usuário, a senha e a string de conexão. Se a chave do Cloud KMS for transmitida, o nome de usuário, a senha e a string de conexão precisarão ser enviados criptografados e codificados em base64. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • disabledAlgorithms: algoritmos separados por vírgula a serem desativados. Se esse valor for definido como none, nenhum algoritmo será desativado. Use esse parâmetro com cuidado, porque os algoritmos desativados por padrão podem ter vulnerabilidades ou problemas de desempenho. Por exemplo, SSLv3, RC4.
  • extraFilesToStage: caminhos do Cloud Storage separados por vírgulas ou secrets do Secret Manager para que os arquivos sejam organizados no worker. Esses arquivos são salvos no diretório /extra_files em cada worker. Por exemplo, gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the JDBC to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_PubSub \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
query=SOURCE_SQL_QUERY,\
outputTopic=OUTPUT_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • DRIVER_CLASS_NAME: o nome da classe do
  • JDBC_CONNECTION_URL: o URL de conexão de JDBC
  • DRIVER_PATHS: os caminho(s) do Cloud Storage separado(s) por vírgula do(s) driver(s) JDBC
  • CONNECTION_USERNAME: o nome de usuário da conexão JDBC.
  • CONNECTION_PASSWORD: a senha de conexão JDBC
  • CONNECTION_PROPERTIES: as propriedades da conexão JDBC, se necessário
  • SOURCE_SQL_QUERY: a consulta SQL a ser executada no banco de dados de origem.
  • OUTPUT_TOPIC: o Pub/Sub a ser publicado
  • KMS_ENCRYPTION_KEY: a chave de criptografia do Cloud KMS

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_PubSub"
    "parameters": {
      "driverClassName": "DRIVER_CLASS_NAME",
      "connectionURL": "JDBC_CONNECTION_URL",
      "driverJars": "DRIVER_PATHS",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "query": "SOURCE_SQL_QUERY",
      "outputTopic": "OUTPUT_TOPIC",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • DRIVER_CLASS_NAME: o nome da classe do
  • JDBC_CONNECTION_URL: o URL de conexão de JDBC
  • DRIVER_PATHS: os caminho(s) do Cloud Storage separado(s) por vírgula do(s) driver(s) JDBC
  • CONNECTION_USERNAME: o nome de usuário da conexão JDBC.
  • CONNECTION_PASSWORD: a senha de conexão JDBC
  • CONNECTION_PROPERTIES: as propriedades da conexão JDBC, se necessário
  • SOURCE_SQL_QUERY: a consulta SQL a ser executada no banco de dados de origem.
  • OUTPUT_TOPIC: o Pub/Sub a ser publicado
  • KMS_ENCRYPTION_KEY: a chave de criptografia do Cloud KMS
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToPubsubOptions;
import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.values.PCollection;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link JdbcToPubsub} batch pipeline reads data from JDBC and publishes to Google Cloud
 * PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_PubSub",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to Pub/Sub",
    description =
        "The Java Database Connectivity (JDBC) to Pub/Sub template is a batch pipeline that ingests data from "
            + "JDBC source and writes the resulting records to a pre-existing Pub/Sub topic as a JSON string.",
    optionsClass = JdbcToPubsubOptions.class,
    flexContainerName = "jdbc-to-pubsub",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output topic must exist prior to running the pipeline."
    })
public class JdbcToPubsub {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(JdbcToPubsub.class);

  /**
   * {@link JdbcIO.RowMapper} implementation to convert Jdbc ResultSet rows to UTF-8 encoded JSONs.
   */
  public static class ResultSetToJSONString implements JdbcIO.RowMapper<String> {

    @Override
    public String mapRow(ResultSet resultSet) throws Exception {
      ResultSetMetaData metaData = resultSet.getMetaData();
      JSONObject json = new JSONObject();

      for (int i = 1; i <= metaData.getColumnCount(); i++) {
        Object value = resultSet.getObject(i);

        // JSONObject.put() does not support null values. The exception is JSONObject.NULL
        if (value == null) {
          json.put(metaData.getColumnLabel(i), JSONObject.NULL);
          continue;
        }

        switch (metaData.getColumnTypeName(i).toLowerCase()) {
          case "clob":
            Clob clobObject = resultSet.getClob(i);
            if (clobObject.length() > Integer.MAX_VALUE) {
              LOG.warn(
                  "The Clob value size {} in column {} exceeds 2GB and will be truncated.",
                  clobObject.length(),
                  metaData.getColumnLabel(i));
            }
            json.put(
                metaData.getColumnLabel(i), clobObject.getSubString(1, (int) clobObject.length()));
            break;
          default:
            json.put(metaData.getColumnLabel(i), value);
        }
      }
      return json.toString();
    }
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    JdbcToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToPubsubOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from JDBC and writes to Pub/Sub.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(JdbcToPubsubOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Jdbc-To-PubSub Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Jdbc Table
     *  2) Write to Pub/Sub topic
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()))
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()));
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()));
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<String> jdbcData =
        pipeline.apply(
            "readFromJdbc",
            JdbcIO.<String>read()
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withQuery(options.getQuery())
                .withCoder(StringUtf8Coder.of())
                .withRowMapper(new ResultSetToJSONString()));

    jdbcData.apply("writeSuccessMessages", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

A seguir