Modello Pub/Sub a Java Database Connectivity (JDBC)

Il modello Pub/Sub a Java Database Connectivity (JDBC) è una pipeline in modalità flusso che importa i dati da una sottoscrizione Pub/Sub preesistente come stringhe JSON e scrive i record risultanti in JDBC.

Requisiti della pipeline

  • La sottoscrizione Pub/Sub deve esistere prima dell'esecuzione della pipeline.
  • L'origine JDBC deve esistere prima dell'esecuzione della pipeline.
  • L'argomento messaggi non recapitabili di output di Pub/Sub deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametro Descrizione
driverClassName Il nome della classe del driver JDBC. Ad esempio, com.mysql.jdbc.Driver.
connectionUrl La stringa dell'URL di connessione JDBC. Ad esempio, jdbc:mysql://some-host:3306/sampledb. Puoi passare questo valore come stringa criptata con una chiave Cloud KMS e poi codificata in Base64. Rimuovi i caratteri di spaziatura dalla stringa codificata Base64.
driverJars Percorsi Cloud Storage separati da virgole per i driver JDBC. Ad esempio, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
username (Facoltativo) Il nome utente da utilizzare per la connessione JDBC. Puoi passare questo valore criptato da una chiave Cloud KMS come stringa con codifica Base64.
password (Facoltativo) La password da utilizzare per la connessione JDBC. Puoi passare questo valore criptato da una chiave Cloud KMS come stringa con codifica Base64.
connectionProperties (Facoltativo) Stringa di proprietà da utilizzare per la connessione JDBC. Il formato della stringa deve essere [propertyName=property;]*. Ad esempio, unicode=true;characterEncoding=UTF-8.
statement Statement da eseguire sul database. L'istruzione deve specificare i nomi delle colonne della tabella in qualsiasi ordine. Solo i valori dei nomi di colonna specificati vengono letti dal JSON e aggiunti all'istruzione. Ad esempio, INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription La sottoscrizione Pub/Sub di input da cui leggere, nel formato projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic L'argomento Pub/Sub a cui inoltrare i messaggi non recapitabili. Ad esempio, projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey (Facoltativo) Chiave di crittografia Cloud KMS per decriptare il nome utente, la password e la stringa di connessione. Se viene passata la chiave Cloud KMS, il nome utente, la password e la stringa di connessione devono essere tutti passati in forma criptata.
extraFilesToStage Percorsi Cloud Storage separati da virgole o secret di Secret Manager per i file da mettere in staging nel worker. Questi file verranno salvati nella directory /extra_files in ogni worker. Ad esempio, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub to JDBC template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • DRIVER_CLASS_NAME: il nome della classe del driver
  • JDBC_CONNECTION_URL: l'URL di connessione JDBC
  • DRIVER_PATHS: i percorsi Cloud Storage separati da virgole dei driver JDBC
  • CONNECTION_USERNAME: il nome utente della connessione JDBC
  • CONNECTION_PASSWORD: la password di connessione JDBC
  • CONNECTION_PROPERTIES: le proprietà di connessione JDBC, se necessario
  • SQL_STATEMENT: l'istruzione SQL da eseguire sul database
  • INPUT_SUBSCRIPTION: la sottoscrizione di input Pub/Sub da cui leggere
  • OUTPUT_DEADLETTER_TOPIC: il Pub/Sub a cui inoltrare i messaggi non recapitabili
  • KMS_ENCRYPTION_KEY: la chiave di crittografia Cloud KMS

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • DRIVER_CLASS_NAME: il nome della classe del driver
  • JDBC_CONNECTION_URL: l'URL di connessione JDBC
  • DRIVER_PATHS: i percorsi Cloud Storage separati da virgole dei driver JDBC
  • CONNECTION_USERNAME: il nome utente della connessione JDBC
  • CONNECTION_PASSWORD: la password di connessione JDBC
  • CONNECTION_PROPERTIES: le proprietà di connessione JDBC, se necessario
  • SQL_STATEMENT: l'istruzione SQL da eseguire sul database
  • INPUT_SUBSCRIPTION: la sottoscrizione di input Pub/Sub da cui leggere
  • OUTPUT_DEADLETTER_TOPIC: il Pub/Sub a cui inoltrare i messaggi non recapitabili
  • KMS_ENCRYPTION_KEY: la chiave di crittografia Cloud KMS
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.DynamicJdbcIO;
import com.google.cloud.teleport.v2.options.PubsubToJdbcOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.JsonStringToQueryMapper;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubsubToJdbc} streaming pipeline reads data from Google Cloud PubSub and publishes to
 * JDBC.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Pubsub_to_Jdbc.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Pubsub_to_Jdbc",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to JDBC",
    description =
        "The Pub/Sub to Java Database Connectivity (JDBC) template is a streaming pipeline that ingests data from a "
            + "pre-existing Cloud Pub/Sub subscription as JSON strings, and writes the resulting records to JDBC.",
    optionsClass = PubsubToJdbcOptions.class,
    flexContainerName = "pubsub-to-jdbc",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-jdbc",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The Cloud Pub/Sub subscription must exist prior to running the pipeline.",
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output deadletter topic must exist prior to running the pipeline.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToJdbc {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(PubsubToJdbc.class);

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubToJdbcOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubToJdbcOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from Pub/Sub and writes to JDBC.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubsubToJdbcOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Pubsub-to-Jdbc Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Pub/Sub subscription
     *  2) Write to Jdbc Table
     *  3) Write errors to deadletter topic
     */
    PCollection<String> pubsubData =
        pipeline.apply(
            "readFromPubSubSubscription",
            PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));

    DynamicJdbcIO.DynamicDataSourceConfiguration dataSourceConfiguration =
        DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                options.getDriverClassName(),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()).get())
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()).get());
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()).get());
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<FailsafeElement<String, String>> errors =
        pubsubData
            .apply(
                "writeToJdbc",
                DynamicJdbcIO.<String>write()
                    .withDataSourceConfiguration(dataSourceConfiguration)
                    .withStatement(options.getStatement())
                    .withPreparedStatementSetter(
                        new JsonStringToQueryMapper(getKeyOrder(options.getStatement()))))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    errors.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
            .setErrorRecordsTopic(options.getOutputDeadletterTopic())
            .build());

    return pipeline.run();
  }

  private static List<String> getKeyOrder(String statement) {
    int startIndex = statement.indexOf("(");
    int endIndex = statement.indexOf(")");
    String data = statement.substring(startIndex + 1, endIndex);
    return Splitter.on(',').trimResults().splitToList(data);
  }
}

Passaggi successivi