Plantilla de conectividad a bases de datos de Java (JDBC) para Pub/Sub

La plantilla de conectividad de la base de datos de Java (JDBC) a Pub/Sub es una canalización por lotes que transfiere datos desde la fuente de JDBC y escribe los registros resultantes en un tema de Pub/Sub preexistente como una string JSON.

Requisitos de la canalización

  • La fuente de JDBC debe existir antes de ejecutar la canalización.
  • El tema de salida de Cloud Pub/Sub debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetros obligatorios

  • driverClassName: Es el nombre de la clase del controlador de JDBC. Por ejemplo, com.mysql.jdbc.Driver
  • connectionUrl: Es la cadena de URL de conexión de JDBC. Puedes pasar este valor como una cadena encriptada con una clave de Cloud KMS y, luego, codificada en Base64. Por ejemplo: "echo -n "jdbc:mysql://some-host:3306/sampledb" | gcloud kms encrypt --location=
  • driverJars: Rutas de Cloud Storage separadas por comas para controladores de JDBC. Por ejemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
  • query: La consulta que se ejecutará en la fuente para extraer los datos. Por ejemplo, select * from sampledb.sample_table
  • outputTopic: El tema de Pub/Sub en el que se publicará. Por ejemplo, projects/<PROJECT_ID>/topics/<TOPIC_NAME>

Parámetros opcionales

  • username: El nombre de usuario que se utilizará en la conexión de JDBC. Puedes pasar este valor como una cadena encriptada con una clave de Cloud KMS y, luego, codificada en Base64. Por ejemplo, echo -n 'some_username' | glcloud kms encrypt --location=my_location --keyring=mykeyring --key=mykey --plaintext-file=- --ciphertext-file=- | base64
  • password: La contraseña para usar en la conexión de JDBC. Puedes pasar este valor como una cadena encriptada con una clave de Cloud KMS y, luego, codificada en Base64. Por ejemplo, echo -n 'some_password' | glcloud kms encrypt --location=my_location --keyring=mykeyring --key=mykey --plaintext-file=- --ciphertext-file=- | base64
  • connectionProperties: La cadena de propiedades para usar en la conexión de JDBC. El formato de la cadena debe ser [propertyName=property;]*. Por ejemplo, unicode=true;characterEncoding=UTF-8.
  • KMSEncryptionKey: La clave de encriptación de Cloud KMS que se usa para desencriptar el nombre de usuario, la contraseña y la cadena de conexión. Si se pasa una clave de Cloud KMS, el nombre de usuario, la contraseña y la cadena de conexión deben pasarse encriptados y codificados en base64. Por ejemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • disabledAlgorithms: Algoritmos separados por comas que se deben inhabilitar. Si este valor se establece como none, no se inhabilita ningún algoritmo. Ten cuidado con este parámetro, ya que los algoritmos inhabilitados de forma predeterminada podrían tener vulnerabilidades o problemas de rendimiento. Por ejemplo, SSLv3, RC4.
  • extraFilesToStage: Rutas de Cloud Storage separadas por comas o secretos de Secret Manager para los archivos que se deben almacenar en etapa intermedia en el trabajador. Estos archivos se guardan en el directorio /extra_files en cada trabajador. Por ejemplo, gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the JDBC to Pub/Sub template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_PubSub \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
query=SOURCE_SQL_QUERY,\
outputTopic=OUTPUT_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • DRIVER_CLASS_NAME: Es el nombre de la clase del controlador.
  • JDBC_CONNECTION_URL: Es la URL de conexión de JDBC.
  • DRIVER_PATHS: Son las rutas de Cloud Storage separadas por comas de los controladores JDBC.
  • CONNECTION_USERNAME: Es el nombre de usuario de la conexión de JDBC.
  • CONNECTION_PASSWORD: Es la contraseña de la conexión de JDBC.
  • CONNECTION_PROPERTIES: Las propiedades de conexión de JDBC, si es necesario
  • SOURCE_SQL_QUERY: Es la consulta de SQL que se ejecutará en la base de datos de origen.
  • OUTPUT_TOPIC: Es el Pub/Sub en el que se publicará.
  • KMS_ENCRYPTION_KEY: Es la clave de encriptación de Cloud KMS.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_PubSub"
    "parameters": {
      "driverClassName": "DRIVER_CLASS_NAME",
      "connectionURL": "JDBC_CONNECTION_URL",
      "driverJars": "DRIVER_PATHS",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "query": "SOURCE_SQL_QUERY",
      "outputTopic": "OUTPUT_TOPIC",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • DRIVER_CLASS_NAME: Es el nombre de la clase del controlador.
  • JDBC_CONNECTION_URL: Es la URL de conexión de JDBC.
  • DRIVER_PATHS: Son las rutas de Cloud Storage separadas por comas de los controladores JDBC.
  • CONNECTION_USERNAME: Es el nombre de usuario de la conexión de JDBC.
  • CONNECTION_PASSWORD: Es la contraseña de la conexión de JDBC.
  • CONNECTION_PROPERTIES: Las propiedades de conexión de JDBC, si es necesario
  • SOURCE_SQL_QUERY: Es la consulta de SQL que se ejecutará en la base de datos de origen.
  • OUTPUT_TOPIC: Es el Pub/Sub en el que se publicará.
  • KMS_ENCRYPTION_KEY: Es la clave de encriptación de Cloud KMS.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToPubsubOptions;
import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.values.PCollection;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link JdbcToPubsub} batch pipeline reads data from JDBC and publishes to Google Cloud
 * PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_PubSub",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to Pub/Sub",
    description =
        "The Java Database Connectivity (JDBC) to Pub/Sub template is a batch pipeline that ingests data from "
            + "JDBC source and writes the resulting records to a pre-existing Pub/Sub topic as a JSON string.",
    optionsClass = JdbcToPubsubOptions.class,
    flexContainerName = "jdbc-to-pubsub",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output topic must exist prior to running the pipeline."
    })
public class JdbcToPubsub {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(JdbcToPubsub.class);

  /**
   * {@link JdbcIO.RowMapper} implementation to convert Jdbc ResultSet rows to UTF-8 encoded JSONs.
   */
  public static class ResultSetToJSONString implements JdbcIO.RowMapper<String> {

    @Override
    public String mapRow(ResultSet resultSet) throws Exception {
      ResultSetMetaData metaData = resultSet.getMetaData();
      JSONObject json = new JSONObject();

      for (int i = 1; i <= metaData.getColumnCount(); i++) {
        Object value = resultSet.getObject(i);

        // JSONObject.put() does not support null values. The exception is JSONObject.NULL
        if (value == null) {
          json.put(metaData.getColumnLabel(i), JSONObject.NULL);
          continue;
        }

        switch (metaData.getColumnTypeName(i).toLowerCase()) {
          case "clob":
            Clob clobObject = resultSet.getClob(i);
            if (clobObject.length() > Integer.MAX_VALUE) {
              LOG.warn(
                  "The Clob value size {} in column {} exceeds 2GB and will be truncated.",
                  clobObject.length(),
                  metaData.getColumnLabel(i));
            }
            json.put(
                metaData.getColumnLabel(i), clobObject.getSubString(1, (int) clobObject.length()));
            break;
          default:
            json.put(metaData.getColumnLabel(i), value);
        }
      }
      return json.toString();
    }
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    JdbcToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToPubsubOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from JDBC and writes to Pub/Sub.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(JdbcToPubsubOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Jdbc-To-PubSub Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Jdbc Table
     *  2) Write to Pub/Sub topic
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()))
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()));
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()));
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<String> jdbcData =
        pipeline.apply(
            "readFromJdbc",
            JdbcIO.<String>read()
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withQuery(options.getQuery())
                .withCoder(StringUtf8Coder.of())
                .withRowMapper(new ResultSetToJSONString()));

    jdbcData.apply("writeSuccessMessages", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

¿Qué sigue?