Plantilla de conectividad a bases de datos de Java (JDBC) de Pub/Sub

La plantilla de conectividad de base de datos de Pub/Sub a Java (JDBC) es una canalización de transmisión que transfiere datos desde una suscripción de Pub/Sub preexistente como strings JSON y escribe los registros resultantes en JDBC.

Requisitos de la canalización

  • La suscripción de Pub/Sub de origen debe existir antes de ejecutar la canalización.
  • La fuente de JDBC debe existir antes de ejecutar la canalización.
  • El tema de mensajes no entregados de salida de Pub/Sub debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetro Descripción
driverClassName El nombre de la clase del controlador de JDBC. Por ejemplo, com.mysql.jdbc.Driver.
connectionUrl La string de la URL de la conexión de JDBC. Por ejemplo, jdbc:mysql://some-host:3306/sampledb. Puedes pasar este valor como una cadena encriptada con una clave de Cloud KMS y, luego, codificada en Base64. Quita los caracteres de espacio en blanco de la cadena codificada en base64.
driverJars Rutas de Cloud Storage separadas por comas para controladores de JDBC. Por ejemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
username Opcional: El nombre de usuario para usar en la conexión de JDBC. Puedes pasar este valor encriptado por una clave de Cloud KMS como una cadena codificada en Base64.
password Opcional: La contraseña para usar en la conexión de JDBC. Puedes pasar este valor encriptado por una clave de Cloud KMS como una cadena codificada en Base64.
connectionProperties Opcional: La cadena de propiedades para usar en la conexión de JDBC. El formato de la string debe ser [propertyName=property;]*. Por ejemplo, unicode=true;characterEncoding=UTF-8
statement Declaración que se ejecutará en la base de datos. La instrucción debe especificar los nombres de columna de la tabla en cualquier orden. Solo los valores de los nombres de columna especificados se leen del JSON y se agregan a la instrucción. Por ejemplo, INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription Suscripción de entrada de Pub/Sub desde la que se va a leer, en el formato projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic El tema de Pub/Sub para reenviar mensajes que no se pueden entregar. Por ejemplo, projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey Opcional: La clave de encriptación de Cloud KMS para desencriptar el nombre de usuario, la contraseña y la cadena de conexión. Si se pasa la clave de Cloud KMS, el nombre de usuario, la contraseña y la string de conexión deben pasarse encriptados.
extraFilesToStage Rutas de Cloud Storage separadas por comas o secretos de Secret Manager para los archivos que se deben almacenar en etapa intermedia en el trabajador. Estos archivos se guardarán en el directorio /extra_files de cada trabajador. Por ejemplo, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to JDBC template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • DRIVER_CLASS_NAME: Es el nombre de la clase del controlador.
  • JDBC_CONNECTION_URL: Es la URL de conexión de JDBC.
  • DRIVER_PATHS: Son las rutas de Cloud Storage separadas por comas de los controladores JDBC.
  • CONNECTION_USERNAME: Es el nombre de usuario de la conexión de JDBC.
  • CONNECTION_PASSWORD: Es la contraseña de la conexión de JDBC.
  • CONNECTION_PROPERTIES: Las propiedades de conexión de JDBC, si es necesario
  • SQL_STATEMENT: Es la instrucción de SQL que se ejecutará en la base de datos.
  • INPUT_SUBSCRIPTION: Es la suscripción de entrada de Pub/Sub desde la que se desea leer.
  • OUTPUT_DEADLETTER_TOPIC: Es el tema de Pub/Sub para reenviar mensajes que no se pueden entregar.
  • KMS_ENCRYPTION_KEY: Es la clave de encriptación de Cloud KMS.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • DRIVER_CLASS_NAME: Es el nombre de la clase del controlador.
  • JDBC_CONNECTION_URL: Es la URL de conexión de JDBC.
  • DRIVER_PATHS: Son las rutas de Cloud Storage separadas por comas de los controladores JDBC.
  • CONNECTION_USERNAME: Es el nombre de usuario de la conexión de JDBC.
  • CONNECTION_PASSWORD: Es la contraseña de la conexión de JDBC.
  • CONNECTION_PROPERTIES: Las propiedades de conexión de JDBC, si es necesario
  • SQL_STATEMENT: Es la instrucción de SQL que se ejecutará en la base de datos.
  • INPUT_SUBSCRIPTION: Es la suscripción de entrada de Pub/Sub desde la que se desea leer.
  • OUTPUT_DEADLETTER_TOPIC: Es el tema de Pub/Sub para reenviar mensajes que no se pueden entregar.
  • KMS_ENCRYPTION_KEY: Es la clave de encriptación de Cloud KMS.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.DynamicJdbcIO;
import com.google.cloud.teleport.v2.options.PubsubToJdbcOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.JsonStringToQueryMapper;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubsubToJdbc} streaming pipeline reads data from Google Cloud PubSub and publishes to
 * JDBC.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Pubsub_to_Jdbc.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Pubsub_to_Jdbc",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to JDBC",
    description =
        "The Pub/Sub to Java Database Connectivity (JDBC) template is a streaming pipeline that ingests data from a "
            + "pre-existing Cloud Pub/Sub subscription as JSON strings, and writes the resulting records to JDBC.",
    optionsClass = PubsubToJdbcOptions.class,
    flexContainerName = "pubsub-to-jdbc",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-jdbc",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The Cloud Pub/Sub subscription must exist prior to running the pipeline.",
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output deadletter topic must exist prior to running the pipeline.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToJdbc {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(PubsubToJdbc.class);

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubToJdbcOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubToJdbcOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from Pub/Sub and writes to JDBC.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubsubToJdbcOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Pubsub-to-Jdbc Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Pub/Sub subscription
     *  2) Write to Jdbc Table
     *  3) Write errors to deadletter topic
     */
    PCollection<String> pubsubData =
        pipeline.apply(
            "readFromPubSubSubscription",
            PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));

    DynamicJdbcIO.DynamicDataSourceConfiguration dataSourceConfiguration =
        DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                options.getDriverClassName(),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()).get())
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()).get());
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()).get());
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<FailsafeElement<String, String>> errors =
        pubsubData
            .apply(
                "writeToJdbc",
                DynamicJdbcIO.<String>write()
                    .withDataSourceConfiguration(dataSourceConfiguration)
                    .withStatement(options.getStatement())
                    .withPreparedStatementSetter(
                        new JsonStringToQueryMapper(getKeyOrder(options.getStatement()))))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    errors.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
            .setErrorRecordsTopic(options.getOutputDeadletterTopic())
            .build());

    return pipeline.run();
  }

  private static List<String> getKeyOrder(String statement) {
    int startIndex = statement.indexOf("(");
    int endIndex = statement.indexOf(")");
    String data = statement.substring(startIndex + 1, endIndex);
    return Splitter.on(',').trimResults().splitToList(data);
  }
}

¿Qué sigue?