Vorlage "MySQL für BigQuery"

Die Vorlage "MySQL für BigQuery" ist eine Batchpipeline, die Daten aus einer MySQL-Tabelle in eine vorhandene BigQuery-Tabelle kopiert. Diese Pipeline verwendet JDBC, um eine Verbindung zu MySQL herzustellen. Als zusätzliche Schutzmaßnahme können Sie auch einen Cloud KMS-Schlüssel zusammen mit einem Base64-codierten Nutzernamen, Passwort und Verbindungsstring-Parametern übergeben, die mit dem Cloud KMS-Schlüssel verschlüsselt sind. Weitere Informationen zum Verschlüsseln von Nutzernamen, Passwörtern und Verbindungsstring-Parametern finden Sie unter Cloud KMS API-Verschlüsselungsendpunkt.

Pipelineanforderungen

  • Die BigQuery-Tabelle muss vor der Pipelineausführung vorhanden sein.
  • Die BigQuery-Tabelle muss ein kompatibles Schema haben.
  • Die relationale Datenbank muss über das Subnetz zugänglich sein, in dem Dataflow ausgeführt wird.

Vorlagenparameter

Erforderliche Parameter

  • driverJars: Die durch Kommas getrennte Liste der JAR-Dateien des Treibers. Beispiel: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
  • driverClassName: Der Name der JDBC-Treiberklasse. Beispiel: com.mysql.jdbc.Driver.
  • connectionURL: Der URL-String für die JDBC-Verbindung. Beispiel: jdbc:mysql://some-host:3306/sampledb. Sie können diesen Wert als String übergeben, der mit einem Cloud KMS-Schlüssel und dann Base64-verschlüsselt ist. Entfernen Sie Leerzeichen aus dem Base64-codierten String. Beachten Sie den Unterschied zwischen einem Oracle-Nicht-RAC-Datenbankverbindungsstring (jdbc:oracle:thin:@some-host:<port>:<sid>) und einem Oracle-RAC-Datenbankverbindungsstring (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>). Beispiel: jdbc:mysql://some-host:3306/sampledb.
  • outputTable: Der Speicherort der BigQuery-Ausgabetabelle. Beispiel: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • bigQueryLoadingTemporaryDirectory: Das temporäre Verzeichnis für den BigQuery-Ladevorgang. Beispiel: gs://your-bucket/your-files/temp_dir.

Optionale Parameter

  • connectionProperties: Attributstring für die JDBC-Verbindung. Das Format des Strings muss [propertyName=property;]* sein. Weitere Informationen finden Sie unter „Konfigurationsattribute“ (https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html) in der MySQL-Dokumentation. Beispiel: unicode=true;characterEncoding=UTF-8
  • username: Der Nutzername für die JDBC-Verbindung. Kann als String übergeben werden, der mit einem Cloud KMS-Schlüssel verschlüsselt ist, oder als Secret Manager-Secret im Format „projects/{project}/secrets/{secret}/versions/{secret_version}“.
  • password: Das Passwort für die JDBC-Verbindung. Kann als String übergeben werden, der mit einem Cloud KMS-Schlüssel verschlüsselt ist, oder als Secret Manager-Secret im Format „projects/{project}/secrets/{secret}/versions/{secret_version}“.
  • Abfrage: Die Abfrage, die in der Quelle zur Extraktion der Daten ausgeführt wird. Beachten Sie, dass einige JDBC-SQL- und BigQuery-Typen einige Unterschiede haben, obwohl sie denselben Namen haben. Beachten Sie die folgenden wichtigen SQL -> BigQuery-Typzuordnungen: DATETIME --> TIMESTAMP. Eine Typumwandlung kann erforderlich sein, wenn Ihre Schemas nicht übereinstimmen. Beispiel: select * from sampledb.sample_table
  • KMSEncryptionKey: Der Cloud KMS-Verschlüsselungsschlüssel zum Entschlüsseln von Nutzernamen, Passwort und Verbindungsstring. Wenn Sie einen Cloud KMS-Schlüssel übergeben, müssen Sie auch den Nutzernamen, das Passwort und den Verbindungsstring verschlüsseln. Beispiel: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • useColumnAlias: Wenn der Wert auf true gesetzt ist, verwendet die Pipeline den Spaltenalias (AS) anstelle des Spaltennamens, um die Zeilen zu BigQuery zuzuordnen. Die Standardeinstellung ist false.
  • isTruncate: Wenn der Wert auf true gesetzt ist, wird die Pipeline vor dem Laden von Daten in BigQuery gekürzt. Die Standardeinstellung ist false, wodurch die Pipeline veranlasst wird, Daten anzuhängen.
  • partitionColumn: Wenn dieser Parameter mit dem Namen des table angegeben wird, der als optionaler Parameter definiert ist, liest JdbcIO die Tabelle parallel, indem mehrere Instanzen der Abfrage in derselben Tabelle (Unterabfrage) mit Bereichen ausgeführt werden. Derzeit werden nur Long-Partitionsspalten unterstützt.
  • Tabelle: Die Tabelle, aus der bei Verwendung von Partitionen gelesen werden soll. Dieser Parameter akzeptiert auch eine Unterabfrage in Klammern. Beispiel: (select id, name from Person) as subq.
  • numPartitions: Die Anzahl der Partitionen. Mit der Unter- und Obergrenze bildet dieser Wert Partitionsschritte für generierte WHERE-Anweisungsausdrücke, die zum gleichmäßigen Aufteilen der Partitionsspalte verwendet werden. Wenn die Eingabe kleiner als 1 ist, wird die Zahl auf 1 gesetzt.
  • lowerBound: Die Untergrenze, die im Partitionsschema verwendet werden soll. Wenn nicht angegeben, wird dieser Wert von Apache Beam automatisch für die unterstützten Typen abgeleitet.
  • upperBound: Die Obergrenze, die im Partitionsschema verwendet werden soll. Wenn nicht angegeben, wird dieser Wert von Apache Beam automatisch für die unterstützten Typen abgeleitet.
  • fetchSize: Die Anzahl der Zeilen, die jeweils aus der Datenbank abgerufen werden sollen. Wird nicht für partitionierte Lesevorgänge verwendet. Die Standardeinstellung ist 50000.
  • createDisposition: Die zu verwendende BigQuery-CreateDisposition-Einstellung. Beispiel: CREATE_IF_NEEDEDoder CREATE_NEVER Die Standardeinstellung ist CREATE_NEVER.
  • bigQuerySchemaPath: Der Cloud Storage-Pfad für das BigQuery-JSON-Schema. Wenn createDisposition auf CREATE_IF_NEEDED festgelegt ist, muss dieser Parameter angegeben werden. Beispiel: gs://your-bucket/your-schema.json
  • outputDeadletterTable: Die BigQuery-Tabelle, die für Nachrichten verwendet werden soll, die die Ausgabetabelle nicht erreicht haben, formatiert als "PROJECT_ID:DATASET_NAME.TABLE_NAME". Wenn die Tabelle nicht vorhanden ist, wird sie beim Ausführen der Pipeline erstellt. Wenn dieser Parameter nicht angegeben ist, schlägt die Pipeline bei Schreibfehlern fehl.Dieser Parameter kann nur angegeben werden, wenn useStorageWriteApi oder useStorageWriteApiAtLeastOnce auf „wahr“ gesetzt ist.
  • disabledAlgorithms: Durch Kommas getrennte Algorithmen zum Deaktivieren. Wenn dieser Wert auf none gesetzt ist, wird kein Algorithmus deaktiviert. Verwenden Sie diesen Parameter mit Vorsicht, da die standardmäßig deaktivierten Algorithmen Sicherheitslücken oder Leistungsprobleme haben können. Beispiel: SSLv3, RC4
  • extraFilesToStage: Durch Kommas getrennte Cloud Storage-Pfade oder Secret Manager-Secrets für Dateien, die im Worker bereitgestellt werden sollen. Diese Dateien werden im Verzeichnis /extra_files in jedem Worker gespeichert. Beispiel: gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.
  • useStorageWriteApi: Wenn true, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist false. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die "Mindestens einmal"-Semantik verwenden möchten (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the MySQL to BigQuery templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MySQL_to_BigQuery \
    --parameters \
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Ersetzen Sie dabei Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JDBC_CONNECTION_URL: Die JDBC-Verbindungs-URL
  • SOURCE_SQL_QUERY: die SQL-Abfrage, die in der Quelldatenbank ausgeführt werden soll
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname
  • PATH_TO_TEMP_DIR_ON_GCS: Der Cloud Storage-Pfad zum temporären Verzeichnis
  • CONNECTION_PROPERTIES: die JDBC-Verbindungsattribute, falls erforderlich
  • CONNECTION_USERNAME: Der Nutzername der JDBC-Verbindung
  • CONNECTION_PASSWORD: Das JDBC-Verbindungspasswort
  • KMS_ENCRYPTION_KEY: der Cloud KMS-Verschlüsselungsschlüssel

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MySQL_to_BigQuery"
     "parameters": {
       "connectionURL": "JDBC_CONNECTION_URL",
       "query": "SOURCE_SQL_QUERY",
       "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
     },
     "environment": { "zone": "us-central1-f" }
   }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JDBC_CONNECTION_URL: Die JDBC-Verbindungs-URL
  • SOURCE_SQL_QUERY: die SQL-Abfrage, die in der Quelldatenbank ausgeführt werden soll
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname
  • PATH_TO_TEMP_DIR_ON_GCS: Der Cloud Storage-Pfad zum temporären Verzeichnis
  • CONNECTION_PROPERTIES: die JDBC-Verbindungsattribute, falls erforderlich
  • CONNECTION_USERNAME: Der Nutzername der JDBC-Verbindung
  • CONNECTION_PASSWORD: Das JDBC-Verbindungspasswort
  • KMS_ENCRYPTION_KEY: der Cloud KMS-Verschlüsselungsschlüssel
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.wrapBigQueryInsertError;
import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider;
import com.google.cloud.teleport.v2.utils.JdbcConverters;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
 * A template that copies data from a relational database using JDBC to an existing BigQuery table.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to BigQuery with BigQuery Storage API support",
    description = {
      "The JDBC to BigQuery template is a batch pipeline that copies data from a relational database table into an existing BigQuery table. "
          + "This pipeline uses JDBC to connect to the relational database. You can use this template to copy data from any relational database with available JDBC drivers into BigQuery.",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. "
          + "See the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a> for additional details on encrypting your username, password, and connection string parameters."
    },
    optionsClass = JdbcToBigQueryOptions.class,
    flexContainerName = "jdbc-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "If BigQuery table already exist before pipeline execution, it must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class JdbcToBigQuery {

  /** Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline
   * and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToBigQueryOptions.class);

    run(options, writeToBQTransform(options));
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
    // Validate BQ STORAGE_WRITE_API options
    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    if (!options.getUseStorageWriteApi()
        && !options.getUseStorageWriteApiAtLeastOnce()
        && !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
      throw new IllegalArgumentException(
          "outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(
                    maybeParseSecret(options.getConnectionURL()), options.getKMSEncryptionKey()))
            .withUsername(
                maybeDecrypt(
                    maybeParseSecret(options.getUsername()), options.getKMSEncryptionKey()))
            .withPassword(
                maybeDecrypt(
                    maybeParseSecret(options.getPassword()), options.getKMSEncryptionKey()));

    if (options.getDriverJars() != null) {
      dataSourceConfiguration = dataSourceConfiguration.withDriverJars(options.getDriverJars());
    }

    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    /*
     * Step 1: Read records via JDBC and convert to TableRow
     *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
     */
    PCollection<TableRow> rows;
    if (options.getPartitionColumn() != null && options.getTable() != null) {
      // Read with Partitions
      JdbcIO.ReadWithPartitions<TableRow, ?> readIO = null;
      final String partitionColumnType = options.getPartitionColumnType();
      if (partitionColumnType == null || "long".equals(partitionColumnType)) {
        JdbcIO.ReadWithPartitions<TableRow, Long> longTypeReadIO =
            JdbcIO.<TableRow, Long>readWithPartitions(TypeDescriptors.longs())
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withTable(options.getTable())
                .withPartitionColumn(options.getPartitionColumn())
                .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
        if (options.getLowerBound() != null && options.getUpperBound() != null) {
          // Check if lower bound and upper bound are long type.
          try {
            longTypeReadIO =
                longTypeReadIO
                    .withLowerBound(Long.valueOf(options.getLowerBound()))
                    .withUpperBound(Long.valueOf(options.getUpperBound()));
          } catch (NumberFormatException e) {
            throw new NumberFormatException(
                "Expected Long values for lowerBound and upperBound, received : " + e.getMessage());
          }
        }
        readIO = longTypeReadIO;
      } else if ("datetime".equals(partitionColumnType)) {
        JdbcIO.ReadWithPartitions<TableRow, DateTime> dateTimeReadIO =
            JdbcIO.<TableRow, DateTime>readWithPartitions(TypeDescriptor.of(DateTime.class))
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withTable(options.getTable())
                .withPartitionColumn(options.getPartitionColumn())
                .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
        if (options.getLowerBound() != null && options.getUpperBound() != null) {
          DateTimeFormatter dateFormatter =
              DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withOffsetParsed();
          // Check if lowerBound and upperBound are DateTime type.
          try {
            dateTimeReadIO =
                dateTimeReadIO
                    .withLowerBound(dateFormatter.parseDateTime(options.getLowerBound()))
                    .withUpperBound(dateFormatter.parseDateTime(options.getUpperBound()));
          } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException(
                "Expected DateTime values in the format for lowerBound and upperBound, received : "
                    + e.getMessage());
          }
        }
        readIO = dateTimeReadIO;
      } else {
        throw new IllegalStateException("Received unsupported partitionColumnType.");
      }
      if (options.getNumPartitions() != null) {
        readIO = readIO.withNumPartitions(options.getNumPartitions());
      }
      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JDBC with Partitions", readIO);
    } else {
      if (options.getQuery() == null) {
        throw new IllegalArgumentException(
            "Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");
      }
      JdbcIO.Read<TableRow> readIO =
          JdbcIO.<TableRow>read()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withQuery(new GCSAwareValueProvider(options.getQuery()))
              .withCoder(TableRowJsonCoder.of())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JdbcIO", readIO);
    }

    /*
     * Step 2: Append TableRow to an existing BigQuery table
     */
    WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ);

    /*
     * Step 3.
     * If using Storage Write API, capture failed inserts and either
     *   a) write error rows to DLQ
     *   b) fail the pipeline
     */
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      PCollection<BigQueryInsertError> insertErrors =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options);

      if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
        /*
         * Step 3a.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            insertErrors
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);

        /*
         * Step 3a Contd.
         * Insert records that failed insert into deadletter table
         */
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getOutputDeadletterTable())
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .setUseWindowedTimestamp(false)
                .build());
      } else {
        /*
         * Step 3b.
         * Fail pipeline upon write errors if no DLQ was specified
         */
        insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn()));
      }
    }

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  static class ThrowWriteErrorsDoFn extends DoFn<BigQueryInsertError, Void> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      BigQueryInsertError insertError = Objects.requireNonNull(c.element());
      List<String> errorMessages =
          insertError.getError().getErrors().stream()
              .map(ErrorProto::getMessage)
              .collect(Collectors.toList());
      String stackTrace = String.join("\nCaused by:", errorMessages);

      throw new IllegalStateException(
          String.format(
              "Failed to insert row %s.\nCaused by: %s", insertError.getRow(), stackTrace));
    }
  }

  /**
   * Create the {@link Write} transform that outputs the collection to BigQuery as per input option.
   */
  @VisibleForTesting
  static Write<TableRow> writeToBQTransform(JdbcToBigQueryOptions options) {
    // Needed for loading GCS filesystem before Pipeline.Create call
    FileSystems.setDefaultPipelineOptions(options);
    Write<TableRow> write =
        BigQueryIO.writeTableRows()
            .withoutValidation()
            .withCreateDisposition(Write.CreateDisposition.valueOf(options.getCreateDisposition()))
            .withWriteDisposition(
                options.getIsTruncate()
                    ? Write.WriteDisposition.WRITE_TRUNCATE
                    : Write.WriteDisposition.WRITE_APPEND)
            .withCustomGcsTempLocation(
                StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()))
            .withExtendedErrorInfo()
            .to(options.getOutputTable());

    if (Write.CreateDisposition.valueOf(options.getCreateDisposition())
        != Write.CreateDisposition.CREATE_NEVER) {
      write = write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
    }
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      write = write.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
    }

    return write;
  }

  /**
   * Retrieves a secret value from SecretManagerUtils if the input string matches the specified
   * pattern.
   *
   * @param secret The input string representing a potential secret.
   * @return The secret value if the input matches the pattern and the secret is found, otherwise
   *     the original input string.
   */
  private static String maybeParseSecret(String secret) {
    // Check if the input string is not null.
    if (secret != null) {
      // Check if the input string matches the pattern for secrets stored in SecretManagerUtils.
      if (secret.matches("projects/.*/secrets/.*/versions/.*")) { // Use .* to match any characters
        // Retrieve the secret value from SecretManagerUtils.
        return SecretManagerUtils.getSecret(secret);
      }
    }
    // If the input is null or doesn't match the pattern, return the original input.
    return secret;
  }
}

Nächste Schritte