Plantilla de SQL Server a BigQuery

La plantilla de SQL Server a BigQuery es una canalización por lotes que copia los datos de una tabla de SQL Server a una tabla de BigQuery existente. Esta canalización utiliza JDBC para conectarse a SQL Server. Para obtener una capa adicional de protección, también puedes pasar una clave de Cloud KMS junto con un nombre de usuario, contraseña y parámetros de string de conexión codificados en Base64 encriptados con la clave de Cloud KMS. Para obtener más información sobre la encriptación de tus parámetros de nombre de usuario, contraseña y string de conexión, consulta el extremo de encriptación de la API de Cloud KMS.

Requisitos de la canalización

  • La tabla de BigQuery debe existir antes de ejecutar la canalización.
  • La tabla de BigQuery debe tener un esquema compatible.
  • La base de datos relacional debe ser accesible desde la subred en la que se ejecuta Dataflow.

Parámetros de la plantilla

Parámetros obligatorios

  • driverJars: Es la lista separada por comas de los archivos JAR del controlador. Por ejemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
  • driverClassName: Es el nombre de la clase del controlador de JDBC. Por ejemplo, com.mysql.jdbc.Driver
  • connectionURL: La cadena de URL de conexión de JDBC. Por ejemplo, jdbc:mysql://some-host:3306/sampledb Puedes pasar este valor como una cadena encriptada con una clave de Cloud KMS y, luego, codificada en Base64. Quita los caracteres de espacio en blanco de la cadena codificada en base64. Ten en cuenta la diferencia entre una cadena de conexión de base de datos de Oracle que no es de RAC (jdbc:oracle:thin:@some-host:<port>:<sid>) y una cadena de conexión de base de datos de Oracle RAC (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>). Por ejemplo, jdbc:mysql://some-host:3306/sampledb.
  • outputTable: La ubicación de la tabla de salida de BigQuery. Por ejemplo, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • bigQueryLoadingTemporaryDirectory: Es el directorio temporal para el proceso de carga de BigQuery. Por ejemplo, gs://your-bucket/your-files/temp_dir

Parámetros opcionales

  • connectionProperties: La cadena de propiedades para usar en la conexión de JDBC. El formato de la cadena debe ser [propertyName=property;]*. Para obtener más información, consulta Propiedades de la configuración (https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html) en la documentación de MySQL. Por ejemplo, unicode=true;characterEncoding=UTF-8.
  • username: El nombre de usuario que se utilizará en la conexión de JDBC. Se puede pasar como una cadena encriptada con una clave de Cloud KMS o como un secreto de Secret Manager con el formato projects/{project}/secrets/{secret}/versions/{secret_version}.
  • password: La contraseña para usar en la conexión de JDBC. Se puede pasar como una cadena encriptada con una clave de Cloud KMS o como un secreto de Secret Manager con el formato projects/{project}/secrets/{secret}/versions/{secret_version}.
  • query: La consulta que se ejecutará en la fuente para extraer los datos. Ten en cuenta que, aunque algunos tipos de SQL de JDBC y BigQuery comparten el mismo nombre, tienen algunas diferencias. Estas son algunas asignaciones de tipos importantes de SQL -> BigQuery que se deben tener en cuenta: DATETIME --> TIMESTAMP. Es posible que se requiera la conversión de tipos si tus esquemas no coinciden. Por ejemplo, select * from sampledb.sample_table.
  • KMSEncryptionKey: La clave de encriptación de Cloud KMS que se usa para desencriptar el nombre de usuario, la contraseña y la cadena de conexión. Si pasas una clave de Cloud KMS, también debes encriptar el nombre de usuario, la contraseña y la cadena de conexión. Por ejemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • useColumnAlias: Si se configura como true, la canalización usa el alias de la columna (AS) en lugar del nombre de la columna para asignar las filas a BigQuery. La configuración predeterminada es false.
  • isTruncate: Si se configura como true, la canalización se trunca antes de cargar los datos en BigQuery. El valor predeterminado es false, lo que hace que la canalización agregue datos.
  • partitionColumn: Si se proporciona este parámetro con el nombre de table definido como parámetro opcional, JdbcIO lee la tabla en paralelo ejecutando varias instancias de la consulta en la misma tabla (subconsulta) usando rangos. Por el momento, solo admite columnas de partición Long.
  • table: Es la tabla desde la que se debe leer cuando se usan particiones. Este parámetro también acepta una subconsulta entre paréntesis. Por ejemplo, (select id, name from Person) as subq
  • numPartitions: Es la cantidad de particiones. Con el límite inferior y superior, este valor forma particiones de segmentación para las expresiones de cláusula WHERE generadas que se usan para dividir la columna de partición de manera uniforme. Cuando la entrada es menor que 1, se establece el número en 1.
  • lowerBound: Es el límite inferior que se usará en el esquema de partición. Si no se proporciona, Apache Beam infiere este valor de manera automática para los tipos compatibles.
  • upperBound: El límite superior que se usará en el esquema de partición. Si no se proporciona, Apache Beam infiere este valor de manera automática para los tipos compatibles.
  • fetchSize: Es la cantidad de filas que se recuperarán de la base de datos a la vez. No se usa para lecturas particionadas. La configuración predeterminada es 50,000.
  • createDisposition: Es la CreateDisposition de BigQuery que se usará. Por ejemplo, CREATE_IF_NEEDED o CREATE_NEVER. Configuración predeterminada: CREATE_NEVER.
  • bigQuerySchemaPath: La ruta de Cloud Storage para el esquema JSON de BigQuery. Si createDisposition se establece en CREATE_IF_NEEDED, se debe especificar este parámetro. Por ejemplo, gs://your-bucket/your-schema.json.
  • outputDeadletterTable: La tabla de BigQuery que se usará para los mensajes que no llegaron a la tabla de salida, con el formato "PROJECT_ID:DATASET_NAME.TABLE_NAME". Si la tabla no existe, se crea cuando se ejecuta la canalización. Si no se especifica este parámetro, la canalización fallará en los errores de escritura.Este parámetro solo se puede especificar si useStorageWriteApi o useStorageWriteApiAtLeastOnce se establece en verdadero.
  • disabledAlgorithms: Algoritmos separados por comas que se deben inhabilitar. Si este valor se establece como none, no se inhabilita ningún algoritmo. Ten cuidado con este parámetro, ya que los algoritmos inhabilitados de forma predeterminada podrían tener vulnerabilidades o problemas de rendimiento. Por ejemplo, SSLv3, RC4.
  • extraFilesToStage: Rutas de Cloud Storage separadas por comas o secretos de Secret Manager para los archivos que se deben almacenar en etapa intermedia en el trabajador. Estos archivos se guardan en el directorio /extra_files en cada trabajador. Por ejemplo, gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>
  • useStorageWriteApi: Si es true, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: Cuando usas la API de Storage Write, se especifica la semántica de escritura. Para usar una semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), configura el parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en false. Este parámetro se aplica solo cuando useStorageWriteApi es true. El valor predeterminado es false.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the SQL Server to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/SQLServer_to_BigQuery \
    --parameters \
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JDBC_CONNECTION_URL: Es la URL de conexión de JDBC
  • SOURCE_SQL_QUERY: La consulta en SQL que se ejecutará en la base de datos de origen.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.
  • CONNECTION_PROPERTIES: Las propiedades de conexión de JDBC, si es necesario.
  • CONNECTION_USERNAME: Es el nombre de usuario de la conexión de JDBC.
  • CONNECTION_PASSWORD: Es la contraseña de la conexión de JDBC.
  • KMS_ENCRYPTION_KEY: La clave de encriptación de Cloud KMS.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/SQLServer_to_BigQuery",
    "parameters": {
      "connectionURL": "JDBC_CONNECTION_URL",
      "query": "SOURCE_SQL_QUERY",
      "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
      "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JDBC_CONNECTION_URL: Es la URL de conexión de JDBC
  • SOURCE_SQL_QUERY: La consulta en SQL que se ejecutará en la base de datos de origen.
  • DATASET: Es el conjunto de datos de BigQuery.
  • TABLE_NAME: Es el nombre de la tabla de BigQuery.
  • PATH_TO_TEMP_DIR_ON_GCS: Es la ruta de acceso de Cloud Storage al directorio temporal.
  • CONNECTION_PROPERTIES: Las propiedades de conexión de JDBC, si es necesario.
  • CONNECTION_USERNAME: Es el nombre de usuario de la conexión de JDBC.
  • CONNECTION_PASSWORD: Es la contraseña de la conexión de JDBC.
  • KMS_ENCRYPTION_KEY: La clave de encriptación de Cloud KMS.
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.wrapBigQueryInsertError;
import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider;
import com.google.cloud.teleport.v2.utils.JdbcConverters;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
 * A template that copies data from a relational database using JDBC to an existing BigQuery table.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to BigQuery with BigQuery Storage API support",
    description = {
      "The JDBC to BigQuery template is a batch pipeline that copies data from a relational database table into an existing BigQuery table. "
          + "This pipeline uses JDBC to connect to the relational database. You can use this template to copy data from any relational database with available JDBC drivers into BigQuery.",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. "
          + "See the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a> for additional details on encrypting your username, password, and connection string parameters."
    },
    optionsClass = JdbcToBigQueryOptions.class,
    flexContainerName = "jdbc-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "If BigQuery table already exist before pipeline execution, it must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class JdbcToBigQuery {

  /** Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline
   * and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToBigQueryOptions.class);

    run(options, writeToBQTransform(options));
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
    // Validate BQ STORAGE_WRITE_API options
    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    if (!options.getUseStorageWriteApi()
        && !options.getUseStorageWriteApiAtLeastOnce()
        && !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
      throw new IllegalArgumentException(
          "outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(
                    maybeParseSecret(options.getConnectionURL()), options.getKMSEncryptionKey()))
            .withUsername(
                maybeDecrypt(
                    maybeParseSecret(options.getUsername()), options.getKMSEncryptionKey()))
            .withPassword(
                maybeDecrypt(
                    maybeParseSecret(options.getPassword()), options.getKMSEncryptionKey()));

    if (options.getDriverJars() != null) {
      dataSourceConfiguration = dataSourceConfiguration.withDriverJars(options.getDriverJars());
    }

    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    /*
     * Step 1: Read records via JDBC and convert to TableRow
     *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
     */
    PCollection<TableRow> rows;
    if (options.getPartitionColumn() != null && options.getTable() != null) {
      // Read with Partitions
      JdbcIO.ReadWithPartitions<TableRow, ?> readIO = null;
      final String partitionColumnType = options.getPartitionColumnType();
      if (partitionColumnType == null || "long".equals(partitionColumnType)) {
        JdbcIO.ReadWithPartitions<TableRow, Long> longTypeReadIO =
            JdbcIO.<TableRow, Long>readWithPartitions(TypeDescriptors.longs())
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withTable(options.getTable())
                .withPartitionColumn(options.getPartitionColumn())
                .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
        if (options.getLowerBound() != null && options.getUpperBound() != null) {
          // Check if lower bound and upper bound are long type.
          try {
            longTypeReadIO =
                longTypeReadIO
                    .withLowerBound(Long.valueOf(options.getLowerBound()))
                    .withUpperBound(Long.valueOf(options.getUpperBound()));
          } catch (NumberFormatException e) {
            throw new NumberFormatException(
                "Expected Long values for lowerBound and upperBound, received : " + e.getMessage());
          }
        }
        readIO = longTypeReadIO;
      } else if ("datetime".equals(partitionColumnType)) {
        JdbcIO.ReadWithPartitions<TableRow, DateTime> dateTimeReadIO =
            JdbcIO.<TableRow, DateTime>readWithPartitions(TypeDescriptor.of(DateTime.class))
                .withDataSourceConfiguration(dataSourceConfiguration)
                .withTable(options.getTable())
                .withPartitionColumn(options.getPartitionColumn())
                .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
        if (options.getLowerBound() != null && options.getUpperBound() != null) {
          DateTimeFormatter dateFormatter =
              DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withOffsetParsed();
          // Check if lowerBound and upperBound are DateTime type.
          try {
            dateTimeReadIO =
                dateTimeReadIO
                    .withLowerBound(dateFormatter.parseDateTime(options.getLowerBound()))
                    .withUpperBound(dateFormatter.parseDateTime(options.getUpperBound()));
          } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException(
                "Expected DateTime values in the format for lowerBound and upperBound, received : "
                    + e.getMessage());
          }
        }
        readIO = dateTimeReadIO;
      } else {
        throw new IllegalStateException("Received unsupported partitionColumnType.");
      }
      if (options.getNumPartitions() != null) {
        readIO = readIO.withNumPartitions(options.getNumPartitions());
      }
      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JDBC with Partitions", readIO);
    } else {
      if (options.getQuery() == null) {
        throw new IllegalArgumentException(
            "Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");
      }
      JdbcIO.Read<TableRow> readIO =
          JdbcIO.<TableRow>read()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withQuery(new GCSAwareValueProvider(options.getQuery()))
              .withCoder(TableRowJsonCoder.of())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JdbcIO", readIO);
    }

    /*
     * Step 2: Append TableRow to an existing BigQuery table
     */
    WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ);

    /*
     * Step 3.
     * If using Storage Write API, capture failed inserts and either
     *   a) write error rows to DLQ
     *   b) fail the pipeline
     */
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      PCollection<BigQueryInsertError> insertErrors =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options);

      if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
        /*
         * Step 3a.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            insertErrors
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);

        /*
         * Step 3a Contd.
         * Insert records that failed insert into deadletter table
         */
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getOutputDeadletterTable())
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .setUseWindowedTimestamp(false)
                .build());
      } else {
        /*
         * Step 3b.
         * Fail pipeline upon write errors if no DLQ was specified
         */
        insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn()));
      }
    }

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  static class ThrowWriteErrorsDoFn extends DoFn<BigQueryInsertError, Void> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      BigQueryInsertError insertError = Objects.requireNonNull(c.element());
      List<String> errorMessages =
          insertError.getError().getErrors().stream()
              .map(ErrorProto::getMessage)
              .collect(Collectors.toList());
      String stackTrace = String.join("\nCaused by:", errorMessages);

      throw new IllegalStateException(
          String.format(
              "Failed to insert row %s.\nCaused by: %s", insertError.getRow(), stackTrace));
    }
  }

  /**
   * Create the {@link Write} transform that outputs the collection to BigQuery as per input option.
   */
  @VisibleForTesting
  static Write<TableRow> writeToBQTransform(JdbcToBigQueryOptions options) {
    // Needed for loading GCS filesystem before Pipeline.Create call
    FileSystems.setDefaultPipelineOptions(options);
    Write<TableRow> write =
        BigQueryIO.writeTableRows()
            .withoutValidation()
            .withCreateDisposition(Write.CreateDisposition.valueOf(options.getCreateDisposition()))
            .withWriteDisposition(
                options.getIsTruncate()
                    ? Write.WriteDisposition.WRITE_TRUNCATE
                    : Write.WriteDisposition.WRITE_APPEND)
            .withCustomGcsTempLocation(
                StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()))
            .withExtendedErrorInfo()
            .to(options.getOutputTable());

    if (Write.CreateDisposition.valueOf(options.getCreateDisposition())
        != Write.CreateDisposition.CREATE_NEVER) {
      write = write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
    }
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      write = write.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
    }

    return write;
  }

  /**
   * Retrieves a secret value from SecretManagerUtils if the input string matches the specified
   * pattern.
   *
   * @param secret The input string representing a potential secret.
   * @return The secret value if the input matches the pattern and the secret is found, otherwise
   *     the original input string.
   */
  private static String maybeParseSecret(String secret) {
    // Check if the input string is not null.
    if (secret != null) {
      // Check if the input string matches the pattern for secrets stored in SecretManagerUtils.
      if (secret.matches("projects/.*/secrets/.*/versions/.*")) { // Use .* to match any characters
        // Retrieve the secret value from SecretManagerUtils.
        return SecretManagerUtils.getSecret(secret);
      }
    }
    // If the input is null or doesn't match the pattern, return the original input.
    return secret;
  }
}

¿Qué sigue?