Vorlage „Pub/Sub für Java Database Connectivity (JDBC)“

Die Vorlage „Pub/Sub für Java Database Connectivity (JDBC)“ ist eine Streamingpipeline, die Daten aus einem bereits vorhandenen Pub/Sub-Abo als JSON-Strings aufnimmt und die resultierenden Datensätze in JDBC schreibt.

Pipelineanforderungen

  • Das Pub/Sub-Abo muss vorhanden sein, bevor die Pipeline ausgeführt wird.
  • Die JDBC-Quelle muss vorhanden sein, bevor die Pipeline ausgeführt wird.
  • Das Thema für unzustellbare Nachrichten muss vor dem Ausführen der Pipeline in der Pub/Sub-Ausgabe vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
driverClassName Der Name der JDBC-Treiberklasse. Beispiel: com.mysql.jdbc.Driver.
connectionUrl Der URL-String für die JDBC-Verbindung. Beispiel: jdbc:mysql://some-host:3306/sampledb. Sie können diesen Wert als String übergeben, der mit einem Cloud KMS-Schlüssel und dann Base64-verschlüsselt ist. Entfernen Sie Leerzeichen aus dem Base64-codierten String.
driverJars Durch Kommas getrennte Cloud Storage-Pfade für JDBC-Treiber. Beispiel: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
username Optional: Der Nutzername, der für die JDBC-Verbindung verwendet werden soll. Sie können diesen Wert mit einem Cloud KMS-Schlüssel als Base64-codierten String übergeben.
password Optional: Das Passwort für die JDBC-Verbindung. Sie können diesen Wert mit einem Cloud KMS-Schlüssel als Base64-codierten String übergeben.
connectionProperties Optional: Attributstring für die JDBC-Verbindung. Format des Strings muss [propertyName=property;]* sein. Beispiel: unicode=true;characterEncoding=UTF-8.
statement Für die Datenbank auszuführende Anweisung. Die Anweisung muss die Spaltennamen der Tabelle in beliebiger Reihenfolge angeben. Nur die Werte der angegebenen Spaltennamen werden aus der JSON-Datei gelesen und zur Anweisung hinzugefügt. Beispiel: INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll, im Format projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic Das Pub/Sub-Thema als Weiterleitungsziel für nicht zustellbare Nachrichten. Beispiel: projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey Optional: Cloud KMS-Verschlüsselungsschlüssel zur Entschlüsselung des Nutzernamens, Passworts und Verbindungsstrings. Wenn der Cloud KMS-Schlüssel übergeben wird, müssen der Nutzername, das Passwort und der Verbindungsstring verschlüsselt übergeben werden.
extraFilesToStage Durch Kommas getrennte Cloud Storage-Pfade oder Secret Manager-Secrets für Dateien, die im Worker bereitgestellt werden sollen. Diese Dateien werden im Verzeichnis /extra_files in jedem Worker gespeichert. Beispiel: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to JDBC templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Ersetzen Sie dabei Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • DRIVER_CLASS_NAME: Der Name der Treiberklasse
  • JDBC_CONNECTION_URL: Die JDBC-Verbindungs-URL
  • DRIVER_PATHS: Die kommagetrennten Cloud Storage-Pfade der JDBC-Treiber
  • CONNECTION_USERNAME: Der Nutzername der JDBC-Verbindung
  • CONNECTION_PASSWORD: Das JDBC-Verbindungspasswort
  • CONNECTION_PROPERTIES: JDBC-Verbindungsattribute, falls erforderlich
  • SQL_STATEMENT: Die SQL-Anweisung, die für die Datenbank ausgeführt werden soll.
  • INPUT_SUBSCRIPTION: Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll.
  • OUTPUT_DEADLETTER_TOPIC: Das Pub/Sub als Weiterleitungsziel für nicht zustellbare Nachrichten
  • KMS_ENCRYPTION_KEY: Der Cloud KMS-Verschlüsselungsschlüssel

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • DRIVER_CLASS_NAME: Der Name der Treiberklasse
  • JDBC_CONNECTION_URL: Die JDBC-Verbindungs-URL
  • DRIVER_PATHS: Die kommagetrennten Cloud Storage-Pfade der JDBC-Treiber
  • CONNECTION_USERNAME: Der Nutzername der JDBC-Verbindung
  • CONNECTION_PASSWORD: Das JDBC-Verbindungspasswort
  • CONNECTION_PROPERTIES: JDBC-Verbindungsattribute, falls erforderlich
  • SQL_STATEMENT: Die SQL-Anweisung, die für die Datenbank ausgeführt werden soll.
  • INPUT_SUBSCRIPTION: Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll.
  • OUTPUT_DEADLETTER_TOPIC: Das Pub/Sub als Weiterleitungsziel für nicht zustellbare Nachrichten
  • KMS_ENCRYPTION_KEY: Der Cloud KMS-Verschlüsselungsschlüssel
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.DynamicJdbcIO;
import com.google.cloud.teleport.v2.options.PubsubToJdbcOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.JsonStringToQueryMapper;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubsubToJdbc} streaming pipeline reads data from Google Cloud PubSub and publishes to
 * JDBC.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Pubsub_to_Jdbc.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Pubsub_to_Jdbc",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to JDBC",
    description =
        "The Pub/Sub to Java Database Connectivity (JDBC) template is a streaming pipeline that ingests data from a "
            + "pre-existing Cloud Pub/Sub subscription as JSON strings, and writes the resulting records to JDBC.",
    optionsClass = PubsubToJdbcOptions.class,
    flexContainerName = "pubsub-to-jdbc",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-jdbc",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The Cloud Pub/Sub subscription must exist prior to running the pipeline.",
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output deadletter topic must exist prior to running the pipeline.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToJdbc {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(PubsubToJdbc.class);

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubToJdbcOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubToJdbcOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from Pub/Sub and writes to JDBC.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubsubToJdbcOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Pubsub-to-Jdbc Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Pub/Sub subscription
     *  2) Write to Jdbc Table
     *  3) Write errors to deadletter topic
     */
    PCollection<String> pubsubData =
        pipeline.apply(
            "readFromPubSubSubscription",
            PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));

    DynamicJdbcIO.DynamicDataSourceConfiguration dataSourceConfiguration =
        DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                options.getDriverClassName(),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()).get())
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()).get());
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()).get());
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<FailsafeElement<String, String>> errors =
        pubsubData
            .apply(
                "writeToJdbc",
                DynamicJdbcIO.<String>write()
                    .withDataSourceConfiguration(dataSourceConfiguration)
                    .withStatement(options.getStatement())
                    .withPreparedStatementSetter(
                        new JsonStringToQueryMapper(getKeyOrder(options.getStatement()))))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    errors.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
            .setErrorRecordsTopic(options.getOutputDeadletterTopic())
            .build());

    return pipeline.run();
  }

  private static List<String> getKeyOrder(String statement) {
    int startIndex = statement.indexOf("(");
    int endIndex = statement.indexOf(")");
    String data = statement.substring(startIndex + 1, endIndex);
    return Splitter.on(',').trimResults().splitToList(data);
  }
}

Nächste Schritte