Java Database Connectivity(JDBC) to Pub/Sub 템플릿

Java Database Connectivity(JDBC) to Pub/Sub 템플릿은 JDBC 소스에서 데이터를 수집하고 결과 레코드를 JSON 문자열로 기존 Pub/Sub 주제에 쓰는 일괄 파이프라인입니다.

파이프라인 요구사항

  • 파이프라인을 실행하기 전에 JDBC 소스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Pub/Sub 출력 주제가 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • driverClassName: JDBC 드라이버 클래스 이름입니다. 예를 들면 com.mysql.jdbc.Driver입니다.
  • connectionUrl: JDBC 연결 URL 문자열입니다. 이 값을 Cloud KMS 키로 암호화된 Base64 인코딩 문자열로 전달할 수 있습니다. 예를 들면 'echo -n "jdbc:mysql://some-host:3306/sampledb" | gcloud kms encrypt --location=입니다.
  • driverJars: JDBC 드라이버의 쉼표로 구분된 Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar입니다.
  • query: 데이터를 추출하기 위해 소스에서 실행될 쿼리입니다. 예를 들면 select * from sampledb.sample_table입니다.
  • outputTopic: 게시할 Pub/Sub 주제입니다. 예를 들면 projects/<PROJECT_ID>/topics/<TOPIC_NAME>입니다.

선택적 매개변수

  • username: JDBC 연결에 사용할 사용자 이름입니다. 이 값을 Cloud KMS 키로 암호화된 Base64 인코딩 문자열로 전달할 수 있습니다. 예를 들면 echo -n 'some_username' | glcloud kms encrypt --location=my_location --keyring=mykeyring --key=mykey --plaintext-file=- --ciphertext-file=- | base64입니다.
  • password: JDBC 연결에 사용할 비밀번호입니다. 이 값을 Cloud KMS 키로 암호화된 Base64 인코딩 문자열로 전달할 수 있습니다. 예를 들면 echo -n 'some_password' | glcloud kms encrypt --location=my_location --keyring=mykeyring --key=mykey --plaintext-file=- --ciphertext-file=- | base64입니다.
  • connectionProperties: JDBC 연결에 사용할 속성 문자열입니다. 문자열 형식은 [propertyName=property;]*여야 합니다. 예를 들면 unicode=true;characterEncoding=UTF-8입니다.
  • KMSEncryptionKey: 사용자 이름, 비밀번호, 연결 문자열을 복호화하는 데 사용할 Cloud KMS 암호화 키입니다. Cloud KMS 키가 전달되면 사용자 이름, 비밀번호, 연결 문자열 모두 암호화되고 base64로 인코딩된 상태로 전달되어야 합니다. 예를 들면 projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key입니다.
  • disabledAlgorithms: 사용 중지할 쉼표로 구분된 알고리즘입니다. 이 값을 none으로 설정하면 알고리즘이 사용 중지되지 않습니다. 기본적으로 중지된 알고리즘은 취약점 또는 성능 문제를 일으킬 수 있으므로 이 매개변수를 사용할 때는 주의해야 합니다. 예를 들면 SSLv3, RC4입니다.
  • extraFilesToStage: 작업자에 스테이징할 파일의 쉼표로 구분된 Cloud Storage 경로 또는 Secret Manager 보안 비밀입니다. 이러한 파일은 각 작업자의 /extra_files 디렉터리에 저장됩니다. 예를 들면 gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the JDBC to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_PubSub \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
query=SOURCE_SQL_QUERY,\
outputTopic=OUTPUT_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • DRIVER_CLASS_NAME: 드라이버 클래스 이름입니다.
  • JDBC_CONNECTION_URL: JDBC 연결 URL
  • DRIVER_PATHS: JDBC 드라이버의 쉼표로 구분된 Cloud Storage 경로
  • CONNECTION_USERNAME: JDBC 연결 사용자 이름
  • CONNECTION_PASSWORD: JDBC 연결 비밀번호
  • CONNECTION_PROPERTIES: JDBC 연결 속성(필요한 경우)
  • SOURCE_SQL_QUERY: 소스 데이터베이스에서 실행할 SQL 쿼리
  • OUTPUT_TOPIC: 게시할 Pub/Sub
  • KMS_ENCRYPTION_KEY: Cloud KMS 암호화 키

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_PubSub"
    "parameters": {
      "driverClassName": "DRIVER_CLASS_NAME",
      "connectionURL": "JDBC_CONNECTION_URL",
      "driverJars": "DRIVER_PATHS",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "query": "SOURCE_SQL_QUERY",
      "outputTopic": "OUTPUT_TOPIC",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • DRIVER_CLASS_NAME: 드라이버 클래스 이름입니다.
  • JDBC_CONNECTION_URL: JDBC 연결 URL
  • DRIVER_PATHS: JDBC 드라이버의 쉼표로 구분된 Cloud Storage 경로
  • CONNECTION_USERNAME: JDBC 연결 사용자 이름
  • CONNECTION_PASSWORD: JDBC 연결 비밀번호
  • CONNECTION_PROPERTIES: JDBC 연결 속성(필요한 경우)
  • SOURCE_SQL_QUERY: 소스 데이터베이스에서 실행할 SQL 쿼리
  • OUTPUT_TOPIC: 게시할 Pub/Sub
  • KMS_ENCRYPTION_KEY: Cloud KMS 암호화 키
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToPubsubOptions;
import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.values.PCollection;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link JdbcToPubsub} batch pipeline reads data from JDBC and publishes to Google Cloud
 * PubSub.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_PubSub",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to Pub/Sub",
    description =
        "The Java Database Connectivity (JDBC) to Pub/Sub template is a batch pipeline that ingests data from "
            + "JDBC source and writes the resulting records to a pre-existing Pub/Sub topic as a JSON string.",
    optionsClass = JdbcToPubsubOptions.class,
    flexContainerName = "jdbc-to-pubsub",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output topic must exist prior to running the pipeline."
    })
public class JdbcToPubsub {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(JdbcToPubsub.class);

  /**
   * {@link JdbcIO.RowMapper} implementation to convert Jdbc ResultSet rows to UTF-8 encoded JSONs.
   */
  public static class ResultSetToJSONString implements JdbcIO.RowMapper<String> {

    @Override
    public String mapRow(ResultSet resultSet) throws Exception {
      ResultSetMetaData metaData = resultSet.getMetaData();
      JSONObject json = new JSONObject();

      for (int i = 1; i <= metaData.getColumnCount(); i++) {
        Object value = resultSet.getObject(i);

        // JSONObject.put() does not support null values. The exception is JSONObject.NULL
        if (value == null) {
          json.put(metaData.getColumnLabel(i), JSONObject.NULL);
          continue;
        }

        switch (metaData.getColumnTypeName(i).toLowerCase()) {
          case "clob":
            Clob clobObject = resultSet.getClob(i);
            if (clobObject.length() > Integer.MAX_VALUE) {
              LOG.warn(
                  "The Clob value size {} in column {} exceeds 2GB and will be truncated.",
                  clobObject.length(),
                  metaData.getColumnLabel(i));
            }
            json.put(
                metaData.getColumnLabel(i), clobObject.getSubString(1, (int) clobObject.length()));
            break;
          default:
            json.put(metaData.getColumnLabel(i), value);
        }
      }
      return json.toString();
    }
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    JdbcToPubsubOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToPubsubOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from JDBC and writes to Pub/Sub.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(JdbcToPubsubOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Jdbc-To-PubSub Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Jdbc Table
     *  2) Write to Pub/Sub topic
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()))
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()));
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()));
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<String> jdbcData =
        pipeline.apply(
            "readFromJdbc",
            JdbcIO.<String>read()
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withQuery(options.getQuery())
                .withCoder(StringUtf8Coder.of())
                .withRowMapper(new ResultSetToJSONString()));

    jdbcData.apply("writeSuccessMessages", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

다음 단계