Modèle Pub/Sub vers Java Database Connectivity (JDBC)

Le modèle Pub/Sub vers Java Database Connectivity (JDBC) est un pipeline de streaming qui ingère les données d'un abonnement Pub/Sub préexistant en tant que chaînes JSON et écrit les enregistrements obtenus dans JDBC.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub doit exister avant l'exécution du pipeline.
  • La source JDBC doit exister avant l'exécution du pipeline.
  • La file d'attente de lettres mortes de sortie Pub/Sub doit exister avant l'exécution du pipeline.

Paramètres de modèle

Paramètre Description
driverClassName Nom de la classe du pilote JDBC. Par exemple, com.mysql.jdbc.Driver.
connectionUrl Chaîne d'URL de connexion JDBC. Par exemple, jdbc:mysql://some-host:3306/sampledb. Vous pouvez transmettre cette valeur sous forme de chaîne chiffrée avec une clé Cloud KMS, puis encodée en base64. Supprimez les espaces blancs de la chaîne encodée en base64.
driverJars Chemins Cloud Storage séparés par une virgule pour les pilotes JDBC. Par exemple, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
username Facultatif : Nom d'utilisateur à utiliser pour la connexion JDBC. Vous pouvez transmettre cette valeur chiffrée par une clé Cloud KMS en tant que chaîne encodée en base64.
password Facultatif : Mot de passe à utiliser pour la connexion JDBC. Vous pouvez transmettre cette valeur chiffrée par une clé Cloud KMS en tant que chaîne encodée en base64.
connectionProperties Facultatif : Chaîne de propriétés à utiliser pour la connexion JDBC. Le format de la chaîne doit être [propertyName=property;]*. Par exemple, unicode=true;characterEncoding=UTF-8.
statement Instruction à exécuter sur la base de données. L'instruction doit spécifier les noms de colonnes de la table dans n'importe quel ordre. Seules les valeurs des noms de colonnes spécifiés sont lues à partir du fichier JSON et ajoutées à l'instruction. Par exemple : INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription Abonnement en entrée Pub/Sub à lire, au format projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic Sujet Pub/Sub pour transférer les messages non distribuables. Par exemple, projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey Facultatif : Clé de chiffrement Cloud KMS permettant de déchiffrer le nom d'utilisateur, le mot de passe et la chaîne de connexion. Si la clé Cloud KMS est transmise, le nom d'utilisateur, le mot de passe et la chaîne de connexion doivent tous être transmis de manière chiffrée.
extraFilesToStage Chemins d'accès Cloud Storage ou secrets Secret Manager séparés par une virgule afin que les fichiers soient traités dans le nœud de calcul. Ces fichiers seront enregistrés dans le répertoire /extra_files de chaque nœud de calcul. Par exemple, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to JDBC template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • DRIVER_CLASS_NAME : nom de la classe du pilote
  • JDBC_CONNECTION_URL : URL de connexion JDBC
  • DRIVER_PATHS : chemin(s) d'accès Cloud Storage séparé(s) par des virgules vers le(s) pilote(s) JDBC
  • CONNECTION_USERNAME : nom d'utilisateur de la connexion JDBC
  • CONNECTION_PASSWORD : mot de passe de la connexion JDBC
  • CONNECTION_PROPERTIES : les propriétés de connexion JDBC, le cas échéant
  • SQL_STATEMENT : instruction SQL à exécuter sur la base de données
  • INPUT_SUBSCRIPTION : abonnement Pub/Sub en entrée à lire
  • OUTPUT_DEADLETTER_TOPIC : sujet Pub/Sub pour transférer les messages non distribuables
  • KMS_ENCRYPTION_KEY : clé de chiffrement Cloud KMS

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • DRIVER_CLASS_NAME : nom de la classe du pilote
  • JDBC_CONNECTION_URL : URL de connexion JDBC
  • DRIVER_PATHS : chemin(s) d'accès Cloud Storage séparé(s) par des virgules vers le(s) pilote(s) JDBC
  • CONNECTION_USERNAME : nom d'utilisateur de la connexion JDBC
  • CONNECTION_PASSWORD : mot de passe de la connexion JDBC
  • CONNECTION_PROPERTIES : les propriétés de connexion JDBC, le cas échéant
  • SQL_STATEMENT : instruction SQL à exécuter sur la base de données
  • INPUT_SUBSCRIPTION : abonnement Pub/Sub en entrée à lire
  • OUTPUT_DEADLETTER_TOPIC : sujet Pub/Sub pour transférer les messages non distribuables
  • KMS_ENCRYPTION_KEY : clé de chiffrement Cloud KMS
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.DynamicJdbcIO;
import com.google.cloud.teleport.v2.options.PubsubToJdbcOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.JsonStringToQueryMapper;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubsubToJdbc} streaming pipeline reads data from Google Cloud PubSub and publishes to
 * JDBC.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Pubsub_to_Jdbc.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Pubsub_to_Jdbc",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to JDBC",
    description =
        "The Pub/Sub to Java Database Connectivity (JDBC) template is a streaming pipeline that ingests data from a "
            + "pre-existing Cloud Pub/Sub subscription as JSON strings, and writes the resulting records to JDBC.",
    optionsClass = PubsubToJdbcOptions.class,
    flexContainerName = "pubsub-to-jdbc",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-jdbc",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The Cloud Pub/Sub subscription must exist prior to running the pipeline.",
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output deadletter topic must exist prior to running the pipeline.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToJdbc {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(PubsubToJdbc.class);

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubToJdbcOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubToJdbcOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from Pub/Sub and writes to JDBC.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubsubToJdbcOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Pubsub-to-Jdbc Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Pub/Sub subscription
     *  2) Write to Jdbc Table
     *  3) Write errors to deadletter topic
     */
    PCollection<String> pubsubData =
        pipeline.apply(
            "readFromPubSubSubscription",
            PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));

    DynamicJdbcIO.DynamicDataSourceConfiguration dataSourceConfiguration =
        DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                options.getDriverClassName(),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()).get())
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()).get());
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()).get());
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<FailsafeElement<String, String>> errors =
        pubsubData
            .apply(
                "writeToJdbc",
                DynamicJdbcIO.<String>write()
                    .withDataSourceConfiguration(dataSourceConfiguration)
                    .withStatement(options.getStatement())
                    .withPreparedStatementSetter(
                        new JsonStringToQueryMapper(getKeyOrder(options.getStatement()))))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    errors.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
            .setErrorRecordsTopic(options.getOutputDeadletterTopic())
            .build());

    return pipeline.run();
  }

  private static List<String> getKeyOrder(String statement) {
    int startIndex = statement.indexOf("(");
    int endIndex = statement.indexOf(")");
    String data = statement.substring(startIndex + 1, endIndex);
    return Splitter.on(',').trimResults().splitToList(data);
  }
}

Étape suivante