Template Java Database Connectivity (JDBC) ke Pub/Sub

Template Java Database Connectivity (JDBC) ke Pub/Sub adalah pipeline batch yang menyerap data dari sumber JDBC dan menulis data yang dihasilkan ke topik Pub/Sub yang sudah ada sebagai string JSON.

Persyaratan pipeline

  • Sumber JDBC harus ada sebelum menjalankan pipeline.
  • Topik output Pub/Sub harus ada sebelum menjalankan pipeline.

Parameter template

Parameter yang diperlukan

  • driverClassName: Nama class driver JDBC. Contoh, com.mysql.jdbc.Driver.
  • connectionUrl: String URL koneksi JDBC. Anda dapat meneruskan nilai ini sebagai string yang dienkripsi dengan kunci Cloud KMS, lalu dienkode dengan Base64. Misalnya: 'echo -n "jdbc:mysql://some-host:3306/sampledb" | gcloud kms encrypt --location=
  • driverJars: Jalur Cloud Storage yang dipisahkan koma untuk driver JDBC. Contoh, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
  • kueri: Kueri yang akan dijalankan di sumber untuk mengekstrak data. Contoh, select * from sampledb.sample_table.
  • outputTopic: Topik Pub/Sub yang akan dipublikasikan. Contoh, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.

Parameter opsional

  • username: Nama pengguna yang akan digunakan untuk koneksi JDBC. Anda dapat meneruskan nilai ini sebagai string yang dienkripsi dengan kunci Cloud KMS, lalu dienkode dengan Base64. Contoh, echo -n 'some_username' | glcloud kms encrypt --location=my_location --keyring=mykeyring --key=mykey --plaintext-file=- --ciphertext-file=- | base64.
  • password: Sandi yang akan digunakan untuk koneksi JDBC. Anda dapat meneruskan nilai ini sebagai string yang dienkripsi dengan kunci Cloud KMS, lalu dienkode dengan Base64. Contoh, echo -n 'some_password' | glcloud kms encrypt --location=my_location --keyring=mykeyring --key=mykey --plaintext-file=- --ciphertext-file=- | base64.
  • connectionProperties: String properti yang akan digunakan untuk koneksi JDBC. Format string harus [propertyName=property;]*. Misalnya, unicode=true;characterEncoding=UTF-8.
  • KMSEncryptionKey: Kunci Enkripsi Cloud KMS yang akan digunakan untuk mendekripsi nama pengguna, sandi, dan string koneksi. Jika kunci Cloud KMS diteruskan, nama pengguna, sandi, dan string koneksi harus diteruskan dalam bentuk terenkripsi dan dienkode base64. Contoh, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • disabledAlgorithms: Algoritma yang dipisahkan koma untuk dinonaktifkan. Jika nilai ini disetel ke none, tidak ada algoritma yang dinonaktifkan. Gunakan parameter ini dengan hati-hati, karena algoritma yang dinonaktifkan secara default mungkin memiliki kerentanan atau masalah performa. Misalnya, SSLv3, RC4.
  • extraFilesToStage: Jalur Cloud Storage yang dipisahkan koma atau secret Secret Manager untuk file yang akan di-stage di pekerja. File ini disimpan di direktori /extra_files di setiap pekerja. Contoh, gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the JDBC to Pub/Sub template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_PubSub \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
query=SOURCE_SQL_QUERY,\
outputTopic=OUTPUT_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • DRIVER_CLASS_NAME: nama class driver
  • JDBC_CONNECTION_URL: URL koneksi JDBC
  • DRIVER_PATHS: jalur Cloud Storage yang dipisahkan koma dari driver JDBC
  • CONNECTION_USERNAME: nama pengguna koneksi JDBC
  • CONNECTION_PASSWORD: sandi koneksi JDBC
  • CONNECTION_PROPERTIES: properti koneksi JDBC, jika diperlukan
  • SOURCE_SQL_QUERY: kueri SQL yang akan dijalankan di database sumber
  • OUTPUT_TOPIC: Pub/Sub yang akan dipublikasikan
  • KMS_ENCRYPTION_KEY: Kunci Enkripsi Cloud KMS

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_PubSub"
    "parameters": {
      "driverClassName": "DRIVER_CLASS_NAME",
      "connectionURL": "JDBC_CONNECTION_URL",
      "driverJars": "DRIVER_PATHS",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "query": "SOURCE_SQL_QUERY",
      "outputTopic": "OUTPUT_TOPIC",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • DRIVER_CLASS_NAME: nama class driver
  • JDBC_CONNECTION_URL: URL koneksi JDBC
  • DRIVER_PATHS: jalur Cloud Storage yang dipisahkan koma dari driver JDBC
  • CONNECTION_USERNAME: nama pengguna koneksi JDBC
  • CONNECTION_PASSWORD: sandi koneksi JDBC
  • CONNECTION_PROPERTIES: properti koneksi JDBC, jika diperlukan
  • SOURCE_SQL_QUERY: kueri SQL yang akan dijalankan di database sumber
  • OUTPUT_TOPIC: Pub/Sub yang akan dipublikasikan
  • KMS_ENCRYPTION_KEY: Kunci Enkripsi Cloud KMS
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToPubsubOptions;
import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.values.PCollection;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link JdbcToPubsub} batch pipeline reads data from JDBC and publishes to Google Cloud
 * PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_PubSub",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to Pub/Sub",
    description =
        "The Java Database Connectivity (JDBC) to Pub/Sub template is a batch pipeline that ingests data from "
            + "JDBC source and writes the resulting records to a pre-existing Pub/Sub topic as a JSON string.",
    optionsClass = JdbcToPubsubOptions.class,
    flexContainerName = "jdbc-to-pubsub",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output topic must exist prior to running the pipeline."
    })
public class JdbcToPubsub {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(JdbcToPubsub.class);

  /**
   * {@link JdbcIO.RowMapper} implementation to convert Jdbc ResultSet rows to UTF-8 encoded JSONs.
   */
  public static class ResultSetToJSONString implements JdbcIO.RowMapper<String> {

    @Override
    public String mapRow(ResultSet resultSet) throws Exception {
      ResultSetMetaData metaData = resultSet.getMetaData();
      JSONObject json = new JSONObject();

      for (int i = 1; i <= metaData.getColumnCount(); i++) {
        Object value = resultSet.getObject(i);

        // JSONObject.put() does not support null values. The exception is JSONObject.NULL
        if (value == null) {
          json.put(metaData.getColumnLabel(i), JSONObject.NULL);
          continue;
        }

        switch (metaData.getColumnTypeName(i).toLowerCase()) {
          case "clob":
            Clob clobObject = resultSet.getClob(i);
            if (clobObject.length() > Integer.MAX_VALUE) {
              LOG.warn(
                  "The Clob value size {} in column {} exceeds 2GB and will be truncated.",
                  clobObject.length(),
                  metaData.getColumnLabel(i));
            }
            json.put(
                metaData.getColumnLabel(i), clobObject.getSubString(1, (int) clobObject.length()));
            break;
          default:
            json.put(metaData.getColumnLabel(i), value);
        }
      }
      return json.toString();
    }
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    JdbcToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToPubsubOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from JDBC and writes to Pub/Sub.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(JdbcToPubsubOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Jdbc-To-PubSub Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Jdbc Table
     *  2) Write to Pub/Sub topic
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()))
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()));
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()));
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<String> jdbcData =
        pipeline.apply(
            "readFromJdbc",
            JdbcIO.<String>read()
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withQuery(options.getQuery())
                .withCoder(StringUtf8Coder.of())
                .withRowMapper(new ResultSetToJSONString()));

    jdbcData.apply("writeSuccessMessages", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

Langkah berikutnya