Template Pub/Sub ke Java Database Connectivity (JDBC)

Template Pub/Sub to Java Database Connectivity (JDBC) adalah pipeline streaming yang menyerap data dari langganan Pub/Sub yang sudah ada sebagai string JSON, dan menulis data yang dihasilkan ke JDBC.

Persyaratan pipeline

  • Langganan Pub/Sub harus ada sebelum menjalankan pipeline.
  • Sumber JDBC harus ada sebelum menjalankan pipeline.
  • Topik dead-letter output Pub/Sub harus ada sebelum menjalankan pipeline.

Parameter template

Parameter Deskripsi
driverClassName Nama class driver JDBC. Misalnya, com.mysql.jdbc.Driver.
connectionUrl String URL koneksi JDBC. Misalnya, jdbc:mysql://some-host:3306/sampledb. Anda dapat meneruskan nilai ini sebagai string yang dienkripsi dengan kunci Cloud KMS, lalu dienkode dengan Base64. Menghapus karakter spasi kosong dari string yang dienkode Base64.
driverJars Jalur Cloud Storage yang dipisahkan koma untuk driver JDBC. Misalnya, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
username Opsional: Nama pengguna yang akan digunakan untuk koneksi JDBC. Anda dapat meneruskan nilai ini yang dienkripsi oleh kunci Cloud KMS sebagai string yang dienkode Base64.
password Opsional: Sandi yang akan digunakan untuk koneksi JDBC. Anda dapat meneruskan nilai ini yang dienkripsi oleh kunci Cloud KMS sebagai string yang dienkode Base64.
connectionProperties Opsional: String properti yang akan digunakan untuk koneksi JDBC. Format string harus [propertyName=property;]*. Contoh, unicode=true;characterEncoding=UTF-8.
statement Pernyataan untuk dijalankan terhadap database. Pernyataan harus menentukan nama kolom tabel dalam urutan apa pun. Hanya nilai nama kolom yang ditentukan yang dibaca dari JSON dan ditambahkan ke pernyataan. Contoh, INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription Langganan input Pub/Sub yang akan dibaca, dalam format projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic Topik Pub/Sub untuk meneruskan pesan yang tidak terkirim. Misalnya, projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey Opsional: Kunci Enkripsi Cloud KMS untuk mendekripsi nama pengguna, sandi, dan string koneksi. Jika kunci Cloud KMS diteruskan, nama pengguna, sandi, dan string koneksi harus diteruskan dalam bentuk terenkripsi.
extraFilesToStage Jalur Cloud Storage yang dipisahkan koma atau secret Secret Manager untuk file yang akan di-staging di pekerja. File ini akan disimpan di direktori /extra_files di setiap pekerja. Contoh, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Pub/Sub to JDBC template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • DRIVER_CLASS_NAME: nama class driver
  • JDBC_CONNECTION_URL: URL koneksi JDBC
  • DRIVER_PATHS: jalur Cloud Storage yang dipisahkan koma dari driver JDBC
  • CONNECTION_USERNAME: nama pengguna koneksi JDBC
  • CONNECTION_PASSWORD: sandi koneksi JDBC
  • CONNECTION_PROPERTIES: properti koneksi JDBC, jika diperlukan
  • SQL_STATEMENT: pernyataan SQL yang akan dieksekusi terhadap database
  • INPUT_SUBSCRIPTION: langganan input Pub/Sub yang akan dibaca
  • OUTPUT_DEADLETTER_TOPIC: Pub/Sub untuk meneruskan pesan yang tidak dapat dikirim
  • KMS_ENCRYPTION_KEY: Kunci Enkripsi Cloud KMS

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • DRIVER_CLASS_NAME: nama class driver
  • JDBC_CONNECTION_URL: URL koneksi JDBC
  • DRIVER_PATHS: jalur Cloud Storage yang dipisahkan koma dari driver JDBC
  • CONNECTION_USERNAME: nama pengguna koneksi JDBC
  • CONNECTION_PASSWORD: sandi koneksi JDBC
  • CONNECTION_PROPERTIES: properti koneksi JDBC, jika diperlukan
  • SQL_STATEMENT: pernyataan SQL yang akan dieksekusi terhadap database
  • INPUT_SUBSCRIPTION: langganan input Pub/Sub yang akan dibaca
  • OUTPUT_DEADLETTER_TOPIC: Pub/Sub untuk meneruskan pesan yang tidak dapat dikirim
  • KMS_ENCRYPTION_KEY: Kunci Enkripsi Cloud KMS
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.DynamicJdbcIO;
import com.google.cloud.teleport.v2.options.PubsubToJdbcOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.JsonStringToQueryMapper;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubsubToJdbc} streaming pipeline reads data from Google Cloud PubSub and publishes to
 * JDBC.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Pubsub_to_Jdbc.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Pubsub_to_Jdbc",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to JDBC",
    description =
        "The Pub/Sub to Java Database Connectivity (JDBC) template is a streaming pipeline that ingests data from a "
            + "pre-existing Cloud Pub/Sub subscription as JSON strings, and writes the resulting records to JDBC.",
    optionsClass = PubsubToJdbcOptions.class,
    flexContainerName = "pubsub-to-jdbc",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-jdbc",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The Cloud Pub/Sub subscription must exist prior to running the pipeline.",
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output deadletter topic must exist prior to running the pipeline.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToJdbc {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(PubsubToJdbc.class);

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubToJdbcOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubToJdbcOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from Pub/Sub and writes to JDBC.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubsubToJdbcOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Pubsub-to-Jdbc Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Pub/Sub subscription
     *  2) Write to Jdbc Table
     *  3) Write errors to deadletter topic
     */
    PCollection<String> pubsubData =
        pipeline.apply(
            "readFromPubSubSubscription",
            PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));

    DynamicJdbcIO.DynamicDataSourceConfiguration dataSourceConfiguration =
        DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                options.getDriverClassName(),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()).get())
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()).get());
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()).get());
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<FailsafeElement<String, String>> errors =
        pubsubData
            .apply(
                "writeToJdbc",
                DynamicJdbcIO.<String>write()
                    .withDataSourceConfiguration(dataSourceConfiguration)
                    .withStatement(options.getStatement())
                    .withPreparedStatementSetter(
                        new JsonStringToQueryMapper(getKeyOrder(options.getStatement()))))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    errors.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
            .setErrorRecordsTopic(options.getOutputDeadletterTopic())
            .build());

    return pipeline.run();
  }

  private static List<String> getKeyOrder(String statement) {
    int startIndex = statement.indexOf("(");
    int endIndex = statement.indexOf(")");
    String data = statement.substring(startIndex + 1, endIndex);
    return Splitter.on(',').trimResults().splitToList(data);
  }
}

Langkah berikutnya