Pub/Sub to Java Database Connectivity(JDBC)テンプレート

Pub/Sub to Java Database Connectivity(JDBC)テンプレートは、既存の Pub/Sub サブスクリプションから JSON 文字列としてデータを取り込み、結果のレコードを JDBC に書き込むストリーミング パイプラインです。

パイプラインの要件

  • パイプラインを実行する前に Pub/Sub サブスクリプションが存在している必要があります。
  • パイプラインを実行する前に JDBC ソースが存在している必要があります。
  • パイプラインを実行する前に Pub/Sub 出力デッドレター トピックが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
driverClassName JDBC ドライバのクラス名。例: com.mysql.jdbc.Driver
connectionUrl JDBC 接続 URL 文字列。例: jdbc:mysql://some-host:3306/sampledbこの値は、Cloud KMS 鍵で暗号化され、Base64 でエンコードされた文字列として渡すことができます。Base64 でエンコードされた文字列から空白文字を削除します。
driverJars JDBC ドライバのカンマ区切りの Cloud Storage パス。例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username 省略可: JDBC 接続に使用するユーザー名。この値は、Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
password 省略可: JDBC 接続に使用するパスワード。この値は、Cloud KMS 鍵で暗号化された Base64 エンコード文字列として渡すことができます。
connectionProperties 省略可: JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。例: unicode=true;characterEncoding=UTF-8
statement データベースに対して実行するステートメント。このステートメントには、テーブルの列名を任意の順序で指定する必要があります。指定した列名の値のみが JSON から読み取られ、ステートメントに追加されます。例: INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription 読み込まれる Pub/Sub 入力サブスクリプション。projects/<project>/subscriptions/<subscription> の形式で指定します。
outputDeadletterTopic 配信不能メッセージを転送するための Pub/Sub トピック。例: projects/<project-id>/topics/<topic-name>
KMSEncryptionKey 省略可: ユーザー名、パスワード、接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、ユーザー名、パスワード、接続文字列はすべて暗号化されて渡されます。
extraFilesToStage ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。例: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub to JDBC template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • DRIVER_CLASS_NAME: ドライバのクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • SQL_STATEMENT: データベースに対して実行される SQL ステートメント
  • INPUT_SUBSCRIPTION: 読み取り元の Pub/Sub 入力サブスクリプション
  • OUTPUT_DEADLETTER_TOPIC: 配信不能メッセージを転送するための Pub/Sub トピック
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • DRIVER_CLASS_NAME: ドライバのクラス名
  • JDBC_CONNECTION_URL: JDBC 接続 URL
  • DRIVER_PATHS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • CONNECTION_USERNAME: JDBC 接続のユーザー名
  • CONNECTION_PASSWORD: JDBC 接続パスワード
  • CONNECTION_PROPERTIES: JDBC 接続プロパティ(必要に応じて)
  • SQL_STATEMENT: データベースに対して実行される SQL ステートメント
  • INPUT_SUBSCRIPTION: 読み取り元の Pub/Sub 入力サブスクリプション
  • OUTPUT_DEADLETTER_TOPIC: 配信不能メッセージを転送するための Pub/Sub トピック
  • KMS_ENCRYPTION_KEY: Cloud KMS 暗号鍵
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.DynamicJdbcIO;
import com.google.cloud.teleport.v2.options.PubsubToJdbcOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.JsonStringToQueryMapper;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubsubToJdbc} streaming pipeline reads data from Google Cloud PubSub and publishes to
 * JDBC.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Pubsub_to_Jdbc.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Pubsub_to_Jdbc",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to JDBC",
    description =
        "The Pub/Sub to Java Database Connectivity (JDBC) template is a streaming pipeline that ingests data from a "
            + "pre-existing Cloud Pub/Sub subscription as JSON strings, and writes the resulting records to JDBC.",
    optionsClass = PubsubToJdbcOptions.class,
    flexContainerName = "pubsub-to-jdbc",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-jdbc",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The Cloud Pub/Sub subscription must exist prior to running the pipeline.",
      "The JDBC source must exist prior to running the pipeline.",
      "The Cloud Pub/Sub output deadletter topic must exist prior to running the pipeline.",
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToJdbc {

  /* Logger for class.*/
  private static final Logger LOG = LoggerFactory.getLogger(PubsubToJdbc.class);

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubToJdbcOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubToJdbcOptions.class);

    run(options);
  }

  /**
   * Runs a pipeline which reads message from Pub/Sub and writes to JDBC.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubsubToJdbcOptions options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    LOG.info("Starting Pubsub-to-Jdbc Pipeline.");

    /*
     * Steps:
     *  1) Read data from a Pub/Sub subscription
     *  2) Write to Jdbc Table
     *  3) Write errors to deadletter topic
     */
    PCollection<String> pubsubData =
        pipeline.apply(
            "readFromPubSubSubscription",
            PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));

    DynamicJdbcIO.DynamicDataSourceConfiguration dataSourceConfiguration =
        DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                options.getDriverClassName(),
                maybeDecrypt(options.getConnectionUrl(), options.getKMSEncryptionKey()).get())
            .withDriverJars(options.getDriverJars());
    if (options.getUsername() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withUsername(
              maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()).get());
    }
    if (options.getPassword() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withPassword(
              maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()).get());
    }
    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    PCollection<FailsafeElement<String, String>> errors =
        pubsubData
            .apply(
                "writeToJdbc",
                DynamicJdbcIO.<String>write()
                    .withDataSourceConfiguration(dataSourceConfiguration)
                    .withStatement(options.getStatement())
                    .withPreparedStatementSetter(
                        new JsonStringToQueryMapper(getKeyOrder(options.getStatement()))))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    errors.apply(
        "WriteFailedRecords",
        ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
            .setErrorRecordsTopic(options.getOutputDeadletterTopic())
            .build());

    return pipeline.run();
  }

  private static List<String> getKeyOrder(String statement) {
    int startIndex = statement.indexOf("(");
    int endIndex = statement.indexOf(")");
    String data = statement.substring(startIndex + 1, endIndex);
    return Splitter.on(',').trimResults().splitToList(data);
  }
}

次のステップ