Java Database Connectivity(JDBC)to BigQuery テンプレート

JDBC to BigQuery テンプレートは、リレーショナル データベース テーブルから既存の BigQuery テーブルにデータをコピーするバッチ パイプラインです。このパイプラインは、JDBC を使用してリレーショナル データベースに接続します。このテンプレートを使用すると、使用可能な JDBC ドライバがある任意のリレーショナル データベースから BigQuery にデータをコピーできます。

保護をさらに強化するために、Cloud KMS 鍵と一緒に、Cloud KMS 鍵で暗号化された Base64 でエンコードされたユーザー名、パスワード、接続文字列パラメータを渡すことができます。ユーザー名、パスワード、接続文字列パラメータの暗号化の詳細については、Cloud KMS API 暗号化エンドポイントをご覧ください。

パイプラインの要件

  • リレーショナル データベース用の JDBC ドライバが使用可能である必要があります。
  • パイプラインを実行する前に、BigQuery テーブルが存在する必要があります。
  • BigQuery テーブルに互換性のあるスキーマが必要です。
  • リレーショナル データベースは、Dataflow が実行されているサブネットからアクセス可能である必要があります。

テンプレートのパラメータ

必須パラメータ

  • driverJars: ドライバ JAR ファイルのカンマ区切りのリスト(例: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar)。
  • driverClassName: JDBC ドライバのクラス名(例: com.mysql.jdbc.Driver)。
  • connectionURL: JDBC 接続 URL の文字列たとえば、jdbc:mysql://some-host:3306/sampledb のようにします。この値は、Cloud KMS 鍵で暗号化され、Base64 でエンコードされた文字列として渡すことができます。Base64 でエンコードされた文字列から空白文字を削除します。Oracle の RAC 以外のデータベース接続文字列(jdbc:oracle:thin:@some-host:<port>:<sid>)と Oracle RAC データベース接続文字列(jdbc:oracle:thin:@//some-host[:<port>]/<service_name>)の違いに注意してください(例: jdbc:mysql://some-host:3306/sampledb)。
  • outputTable: BigQuery 出力テーブルの場所(例: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>)。
  • bigQueryLoadingTemporaryDirectory: BigQuery 読み込みプロセスの一時ディレクトリ(例: gs://your-bucket/your-files/temp_dir)。

オプション パラメータ

  • connectionProperties: JDBC 接続に使用するプロパティ文字列。文字列の形式は [propertyName=property;]* にする必要があります。詳細については、MySQL ドキュメントの構成プロパティ(https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html)をご覧ください(例: unicode=true;characterEncoding=UTF-8)。
  • username: JDBC 接続に使用するユーザー名。Cloud KMS 鍵で暗号化された文字列として渡すことができます。または、projects/{project}/secrets/{secret}/versions/{secret_version} 形式の Secret Manager シークレットとして渡すこともできます。
  • password: JDBC 接続に使用するパスワード。Cloud KMS 鍵で暗号化された文字列として渡すことができます。または、projects/{project}/secrets/{secret}/versions/{secret_version} 形式の Secret Manager シークレットとして渡すこともできます。
  • query: ソースで実行されるクエリでデータを抽出します。一部の JDBC SQL 型と BigQuery 型は同じ名前を共有していますが、いくつかの違いがあります。重要な SQL と BigQuery のデータ型のマッピングには、DATETIME --> TIMESTAMP などがあります。

スキーマが一致しない場合は、型キャストが必要になることがあります。このパラメータは、クエリを読み込む Cloud Storage 内のファイルを指す gs:// パスに設定できます。ファイルのエンコードは UTF-8 にする必要があります(例: select * from sampledb.sample_table)。

  • KMSEncryptionKey: ユーザー名、パスワード、接続文字列の復号に使用する Cloud KMS 暗号鍵。Cloud KMS 鍵を渡す場合は、ユーザー名、パスワード、接続文字列も暗号化する必要があります(例: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key)。
  • useColumnAlias: true に設定すると、パイプラインは列名の代わりに列エイリアス(AS)を使用して、行を BigQuery にマッピングします。デフォルトは false です。
  • isTruncate: true に設定すると、パイプラインは BigQuery にデータを読み込む前にデータを切り捨てます。デフォルトは false で、パイプラインはデータを追加します。
  • partitionColumn: このパラメータに、オプション パラメータとして定義された table の名前が指定されている場合、JdbcIO は範囲を使用して同じテーブル(サブクエリ)に対して複数のクエリ インスタンスを実行し、テーブルを並列で読み取ります。現在、Long パーティション列のみをサポートしています。
  • table: パーティションの使用時に読み取るテーブル。このパラメータは、かっこ内のサブクエリも受け入れます(例: (select id, name from Person) as subq)。
  • numPartitions: パーティションの数。下限と上限により、この値は、パーティション列を均等に分割するために使用される生成された WHERE 句式のパーティション ストライドを形成します。入力が 1 より小さい場合、数値は 1 に設定されます。
  • lowerBound: パーティション スキームで使用する下限。指定しない場合、サポートされているタイプのこの値は Apache Beam によって自動的に推測されます。
  • upperBound: パーティション スキームで使用する上限。指定しない場合、サポートされているタイプのこの値は Apache Beam によって自動的に推測されます。
  • fetchSize: データベースから一度に取得される行数。パーティション分割される読み取りには使用されません。デフォルトは 50,000 です。
  • createDisposition: 使用する BigQuery CreateDisposition。たとえば、CREATE_IF_NEEDEDCREATE_NEVER です。デフォルトは CREATE_NEVER です。
  • bigQuerySchemaPath: BigQuery JSON スキーマの Cloud Storage パス。createDisposition が CREATE_IF_NEEDED に設定されている場合は、このパラメータを指定する必要があります(例: gs://your-bucket/your-schema.json)。
  • disabledAlgorithms: 無効にするためのカンマ区切りのアルゴリズム。この値が none に設定されている場合、アルゴリズムは無効になりません。デフォルトで無効になっているアルゴリズムには脆弱性やパフォーマンスの問題が存在する可能性があるため、このパラメータは慎重に使用してください(例: SSLv3、RC4)。
  • extraFilesToStage: ワーカーにステージングするファイルのカンマ区切りの Cloud Storage パスまたは Secret Manager シークレット。これらのファイルは、各ワーカーの /extra_files ディレクトリに保存されます。(例: gs://)
  • useStorageWriteApi: true の場合、パイプラインでは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the JDBC to BigQuery with BigQuery Storage API support template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_BigQuery_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       driverJars=DRIVER_JARS,\
       driverClassName=DRIVER_CLASS_NAME,\
       connectionURL=CONNECTION_URL,\
       outputTable=OUTPUT_TABLE,\
       bigQueryLoadingTemporaryDirectory=BIG_QUERY_LOADING_TEMPORARY_DIRECTORY,\

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • DRIVER_JARS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • DRIVER_CLASS_NAME: JDBC ドライバのクラス名
  • CONNECTION_URL: JDBC 接続 URL 文字列
  • OUTPUT_TABLE: BigQuery 出力テーブル
  • BIG_QUERY_LOADING_TEMPORARY_DIRECTORY: BigQuery 読み込みプロセスで使用する一時ディレクトリ

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "driverJars": "DRIVER_JARS",
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "CONNECTION_URL",
       "outputTable": "OUTPUT_TABLE",
       "bigQueryLoadingTemporaryDirectory": "BIG_QUERY_LOADING_TEMPORARY_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_BigQuery_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • DRIVER_JARS: カンマで区切った JDBC ドライバの Cloud Storage パス
  • DRIVER_CLASS_NAME: JDBC ドライバのクラス名
  • CONNECTION_URL: JDBC 接続 URL 文字列
  • OUTPUT_TABLE: BigQuery 出力テーブル
  • BIG_QUERY_LOADING_TEMPORARY_DIRECTORY: BigQuery 読み込みプロセスで使用する一時ディレクトリ
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider;
import com.google.cloud.teleport.v2.utils.JdbcConverters;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.values.PCollection;

/**
 * A template that copies data from a relational database using JDBC to an existing BigQuery table.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to BigQuery with BigQuery Storage API support",
    description = {
      "The JDBC to BigQuery template is a batch pipeline that copies data from a relational database table into an existing BigQuery table. "
          + "This pipeline uses JDBC to connect to the relational database. You can use this template to copy data from any relational database with available JDBC drivers into BigQuery.",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. "
          + "See the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a> for additional details on encrypting your username, password, and connection string parameters."
    },
    optionsClass = JdbcToBigQueryOptions.class,
    flexContainerName = "jdbc-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "The BigQuery table must exist before pipeline execution.",
      "The BigQuery table must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class JdbcToBigQuery {

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline
   * and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToBigQueryOptions.class);

    run(options, writeToBQTransform(options));
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
    // Validate BQ STORAGE_WRITE_API options
    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(
                    maybeParseSecret(options.getConnectionURL()), options.getKMSEncryptionKey()))
            .withUsername(
                maybeDecrypt(
                    maybeParseSecret(options.getUsername()), options.getKMSEncryptionKey()))
            .withPassword(
                maybeDecrypt(
                    maybeParseSecret(options.getPassword()), options.getKMSEncryptionKey()));

    if (options.getDriverJars() != null) {
      dataSourceConfiguration = dataSourceConfiguration.withDriverJars(options.getDriverJars());
    }

    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    /*
     * Step 1: Read records via JDBC and convert to TableRow
     *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
     */
    PCollection<TableRow> rows;
    if (options.getPartitionColumn() != null && options.getTable() != null) {
      // Read with Partitions
      // TODO(pranavbhandari): Support readWithPartitions for other data types.
      JdbcIO.ReadWithPartitions<TableRow, Long> readIO =
          JdbcIO.<TableRow>readWithPartitions()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withTable(options.getTable())
              .withPartitionColumn(options.getPartitionColumn())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
      if (options.getNumPartitions() != null) {
        readIO = readIO.withNumPartitions(options.getNumPartitions());
      }
      if (options.getLowerBound() != null && options.getUpperBound() != null) {
        readIO =
            readIO.withLowerBound(options.getLowerBound()).withUpperBound(options.getUpperBound());
      }

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JDBC with Partitions", readIO);
    } else {
      if (options.getQuery() == null) {
        throw new IllegalArgumentException(
            "Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");
      }
      JdbcIO.Read<TableRow> readIO =
          JdbcIO.<TableRow>read()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withQuery(new GCSAwareValueProvider(options.getQuery()))
              .withCoder(TableRowJsonCoder.of())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JdbcIO", readIO);
    }

    /*
     * Step 2: Append TableRow to an existing BigQuery table
     */
    rows.apply("Write to BigQuery", writeToBQ);

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Create the {@link Write} transform that outputs the collection to BigQuery as per input option.
   */
  @VisibleForTesting
  static Write<TableRow> writeToBQTransform(JdbcToBigQueryOptions options) {
    // Needed for loading GCS filesystem before Pipeline.Create call
    FileSystems.setDefaultPipelineOptions(options);
    Write<TableRow> write =
        BigQueryIO.writeTableRows()
            .withoutValidation()
            .withCreateDisposition(Write.CreateDisposition.valueOf(options.getCreateDisposition()))
            .withWriteDisposition(
                options.getIsTruncate()
                    ? Write.WriteDisposition.WRITE_TRUNCATE
                    : Write.WriteDisposition.WRITE_APPEND)
            .withCustomGcsTempLocation(
                StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()))
            .to(options.getOutputTable());

    if (Write.CreateDisposition.valueOf(options.getCreateDisposition())
        != Write.CreateDisposition.CREATE_NEVER) {
      write = write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
    }

    return write;
  }

  /**
   * Retrieves a secret value from SecretManagerUtils if the input string matches the specified
   * pattern.
   *
   * @param secret The input string representing a potential secret.
   * @return The secret value if the input matches the pattern and the secret is found, otherwise
   *     the original input string.
   */
  private static String maybeParseSecret(String secret) {
    // Check if the input string is not null.
    if (secret != null) {
      // Check if the input string matches the pattern for secrets stored in SecretManagerUtils.
      if (secret.matches("projects/.*/secrets/.*/versions/.*")) { // Use .* to match any characters
        // Retrieve the secret value from SecretManagerUtils.
        return SecretManagerUtils.getSecret(secret);
      }
    }
    // If the input is null or doesn't match the pattern, return the original input.
    return secret;
  }
}

次のステップ