Modelo do Oracle para BigQuery

O modelo Oracle para BigQuery é um pipeline em lote que copia dados de uma tabela da Oracle para uma tabela atual do BigQuery. Este pipeline usa JDBC para se conectar à Oracle. Para ter uma camada extra de proteção, é possível transmitir uma chave do Cloud KMS com um nome de usuário, senha e parâmetros da string de conexão criptografados em Base64 com a chave do Cloud KMS. Para mais informações sobre como criptografar o nome de usuário, a senha e os parâmetros da string de conexão, consulte o endpoint de criptografia da API Cloud KMS.

Requisitos de pipeline

  • A tabela do BigQuery precisa existir antes da execução do pipeline.
  • A tabela do BigQuery precisa ter um esquema compatível.
  • O banco de dados relacional precisa estar acessível na sub-rede em que o Dataflow é executado.

Parâmetros do modelo

Parâmetros obrigatórios

  • driverJars: a lista separada por vírgulas de arquivos JAR do driver. Por exemplo, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
  • driverClassName: o nome da classe do driver do JDBC. Por exemplo, com.mysql.jdbc.Driver.
  • connectionURL: a string do URL de conexão do JDBC. Por exemplo, jdbc:mysql://some-host:3306/sampledb. É possível transmitir esse valor como uma string criptografada com uma chave do Cloud KMS e, em seguida, codificada em Base64. Remova os caracteres de espaço em branco da string codificada em Base64. Observe a diferença entre uma string de conexão de banco de dados Oracle não RAC (jdbc:oracle:thin:@some-host:<port>:<sid>) e uma string de conexão de banco de dados Oracle RAC (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>). Por exemplo, jdbc:mysql://some-host:3306/sampledb.
  • outputTable: o local da tabela de saída do BigQuery. Por exemplo, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • bigQueryLoadingTemporaryDirectory: o diretório temporário do processo de carregamento do BigQuery. Por exemplo, gs://your-bucket/your-files/temp_dir.

Parâmetros opcionais

  • connectionProperties: a string de propriedades a ser usada para a conexão JDBC. O formato da string precisa ser [propertyName=property;]*.Para mais informações, consulte as propriedades de configuração (https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html) na documentação do MySQL. Por exemplo, unicode=true;characterEncoding=UTF-8.
  • username: o nome de usuário a ser usado para a conexão JDBC. Pode ser transmitida como uma string criptografada com uma chave do Cloud KMS ou como um secret do Secret Manager no formato projects/{project}/secrets/{secret}/versions/{secret_version}.
  • password: a senha a ser usada para a conexão JDBC. Pode ser transmitida como uma string criptografada com uma chave do Cloud KMS ou como um secret do Secret Manager no formato projects/{project}/secrets/{secret}/versions/{secret_version}.
  • query: a consulta a ser executada na origem para extrair os dados. Alguns tipos JDBC e BigQuery, embora compartilhem o mesmo nome, têm algumas diferenças. Alguns mapeamentos de tipo importantes de SQL -> BigQuery a serem lembrados são DATETIME --> TIMESTAMP. Talvez seja necessário transmitir o tipo se os esquemas não corresponderem. Por exemplo, select * from sampledb.sample_table.
  • KMSEncryptionKey: a chave de criptografia do Cloud KMS a ser usada para descriptografar o nome de usuário, a senha e a string de conexão. Se você transmitir uma chave do Cloud KMS, também precisará criptografar o nome de usuário, a senha e a string de conexão. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • useColumnAlias: se definido como true, o pipeline usa o alias de coluna (AS) em vez do nome da coluna para mapear as linhas para o BigQuery. O padrão é false.
  • isTruncate: se definido como true, o pipeline será truncado antes de carregar dados no BigQuery. O padrão é false, o que faz com que o pipeline adicione dados ao final.
  • partitionColumn: se esse parâmetro for fornecido com o nome do table definido como um parâmetro opcional, o JdbcIO lerá a tabela em paralelo executando várias instâncias da consulta na mesma tabela (subconsulta) usando intervalos. No momento, ele só é compatível com colunas de partição Long.
  • table: a tabela a ser lida ao usar partições. Esse parâmetro também aceita uma subconsulta entre parênteses. Por exemplo, (select id, name from Person) as subq.
  • numPartitions: o número de partições. Com os limites inferior e superior, esse valor forma saltos de partição para expressões de cláusula WHERE geradas que são usadas para dividir a coluna de partição de maneira uniforme. Quando a entrada for menor que 1, o número será definido como 1.
  • lowerBound: o limite inferior a ser usado no esquema de partição. Se não for fornecido, esse valor será inferido automaticamente pelo Apache Beam para os tipos compatíveis.
  • upperBound: o limite superior a ser usado no esquema de partição. Se não for fornecido, esse valor será inferido automaticamente pelo Apache Beam para os tipos compatíveis.
  • fetchSize: o número de linhas a serem buscadas no banco de dados de cada vez. Não é usado para leituras particionadas. O padrão é 50000.
  • createDisposition: o CreateDisposition do BigQuery a ser usado. Por exemplo, CREATE_IF_NEEDED ou CREATE_NEVER. O padrão é: CREATE_NEVER.
  • bigQuerySchemaPath: o caminho do Cloud Storage para o esquema JSON do BigQuery. Se createDisposition estiver definido como CREATE_IF_NEEDED, esse parâmetro precisará ser especificado. Por exemplo, gs://your-bucket/your-schema.json.
  • outputDeadletterTable: a tabela do BigQuery a ser usada para mensagens que não alcançaram a tabela de saída, formatada como "PROJECT_ID:DATASET_NAME.TABLE_NAME". Se a tabela não existir, ela será criada quando o pipeline for executado. Se esse parâmetro não for especificado, o pipeline vai falhar em erros de gravação.Esse parâmetro só pode ser especificado se useStorageWriteApi ou useStorageWriteApiAtLeastOnce for definido como verdadeiro.
  • disabledAlgorithms: algoritmos separados por vírgula a serem desativados. Se esse valor for definido como none, nenhum algoritmo será desativado. Use esse parâmetro com cuidado, porque os algoritmos desativados por padrão podem ter vulnerabilidades ou problemas de desempenho. Por exemplo, SSLv3, RC4.
  • extraFilesToStage: caminhos do Cloud Storage separados por vírgulas ou secrets do Secret Manager para que os arquivos sejam organizados no worker. Esses arquivos são salvos no diretório /extra_files em cada worker. Por exemplo, gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.
  • useStorageWriteApi: se true, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: ao usar a API Storage Write, especifica a semântica de gravação. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Oracle to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Oracle_to_BigQuery \
    --parameters \
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • JDBC_CONNECTION_URL: o URL de conexão de JDBC
  • SOURCE_SQL_QUERY: a consulta SQL a ser executada no banco de dados de origem.
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
  • CONNECTION_PROPERTIES: as propriedades de conexão do JDBC, se necessário
  • CONNECTION_USERNAME: o nome de usuário da conexão JDBC.
  • CONNECTION_PASSWORD: a senha de conexão JDBC
  • KMS_ENCRYPTION_KEY: a chave de criptografia do Cloud KMS

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "connectionURL": "JDBC_CONNECTION_URL",
       "query": "SOURCE_SQL_QUERY",
       "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • JDBC_CONNECTION_URL: o URL de conexão de JDBC
  • SOURCE_SQL_QUERY: a consulta SQL a ser executada no banco de dados de origem.
  • DATASET: o conjunto de dados do BigQuery
  • TABLE_NAME: o nome da tabela do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
  • CONNECTION_PROPERTIES: as propriedades de conexão do JDBC, se necessário
  • CONNECTION_USERNAME: o nome de usuário da conexão JDBC.
  • CONNECTION_PASSWORD: a senha de conexão JDBC
  • KMS_ENCRYPTION_KEY: a chave de criptografia do Cloud KMS
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.transforms.BigQueryConverters.wrapBigQueryInsertError;
import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider;
import com.google.cloud.teleport.v2.utils.JdbcConverters;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

/**
 * A template that copies data from a relational database using JDBC to an existing BigQuery table.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-to-googlecloud/README_Jdbc_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Jdbc_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "JDBC to BigQuery with BigQuery Storage API support",
    description = {
      "The JDBC to BigQuery template is a batch pipeline that copies data from a relational database table into an existing BigQuery table. "
          + "This pipeline uses JDBC to connect to the relational database. You can use this template to copy data from any relational database with available JDBC drivers into BigQuery.",
      "For an extra layer of protection, you can also pass in a Cloud KMS key along with a Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. "
          + "See the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a> for additional details on encrypting your username, password, and connection string parameters."
    },
    optionsClass = JdbcToBigQueryOptions.class,
    flexContainerName = "jdbc-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The JDBC drivers for the relational database must be available.",
      "If BigQuery table already exist before pipeline execution, it must have a compatible schema.",
      "The relational database must be accessible from the subnet where Dataflow runs."
    })
public class JdbcToBigQuery {

  /** Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline
   * and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToBigQueryOptions.class);

    run(options, writeToBQTransform(options));
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
    // Validate BQ STORAGE_WRITE_API options
    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
    if (!options.getUseStorageWriteApi()
        && !options.getUseStorageWriteApiAtLeastOnce()
        && !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
      throw new IllegalArgumentException(
          "outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    JdbcIO.DataSourceConfiguration dataSourceConfiguration =
        JdbcIO.DataSourceConfiguration.create(
                StaticValueProvider.of(options.getDriverClassName()),
                maybeDecrypt(
                    maybeParseSecret(options.getConnectionURL()), options.getKMSEncryptionKey()))
            .withUsername(
                maybeDecrypt(
                    maybeParseSecret(options.getUsername()), options.getKMSEncryptionKey()))
            .withPassword(
                maybeDecrypt(
                    maybeParseSecret(options.getPassword()), options.getKMSEncryptionKey()));

    if (options.getDriverJars() != null) {
      dataSourceConfiguration = dataSourceConfiguration.withDriverJars(options.getDriverJars());
    }

    if (options.getConnectionProperties() != null) {
      dataSourceConfiguration =
          dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
    }

    /*
     * Step 1: Read records via JDBC and convert to TableRow
     *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
     */
    PCollection<TableRow> rows;
    if (options.getPartitionColumn() != null && options.getTable() != null) {
      // Read with Partitions
      // TODO(pranavbhandari): Support readWithPartitions for other data types.
      JdbcIO.ReadWithPartitions<TableRow, Long> readIO =
          JdbcIO.<TableRow>readWithPartitions()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withTable(options.getTable())
              .withPartitionColumn(options.getPartitionColumn())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
      if (options.getNumPartitions() != null) {
        readIO = readIO.withNumPartitions(options.getNumPartitions());
      }
      if (options.getLowerBound() != null && options.getUpperBound() != null) {
        readIO =
            readIO.withLowerBound(options.getLowerBound()).withUpperBound(options.getUpperBound());
      }

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JDBC with Partitions", readIO);
    } else {
      if (options.getQuery() == null) {
        throw new IllegalArgumentException(
            "Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");
      }
      JdbcIO.Read<TableRow> readIO =
          JdbcIO.<TableRow>read()
              .withDataSourceConfiguration(dataSourceConfiguration)
              .withQuery(new GCSAwareValueProvider(options.getQuery()))
              .withCoder(TableRowJsonCoder.of())
              .withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));

      if (options.getFetchSize() != null && options.getFetchSize() > 0) {
        readIO = readIO.withFetchSize(options.getFetchSize());
      }

      rows = pipeline.apply("Read from JdbcIO", readIO);
    }

    /*
     * Step 2: Append TableRow to an existing BigQuery table
     */
    WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ);

    /*
     * Step 3.
     * If using Storage Write API, capture failed inserts and either
     *   a) write error rows to DLQ
     *   b) fail the pipeline
     */
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      PCollection<BigQueryInsertError> insertErrors =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options);

      if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
        /*
         * Step 3a.
         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
         */
        PCollection<FailsafeElement<String, String>> failedInserts =
            insertErrors
                .apply(
                    "WrapInsertionErrors",
                    MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                        .via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
                .setCoder(FAILSAFE_ELEMENT_CODER);

        /*
         * Step 3a Contd.
         * Insert records that failed insert into deadletter table
         */
        failedInserts.apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(options.getOutputDeadletterTable())
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .setUseWindowedTimestamp(false)
                .build());
      } else {
        /*
         * Step 3b.
         * Fail pipeline upon write errors if no DLQ was specified
         */
        insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn()));
      }
    }

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  static class ThrowWriteErrorsDoFn extends DoFn<BigQueryInsertError, Void> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      BigQueryInsertError insertError = Objects.requireNonNull(c.element());
      List<String> errorMessages =
          insertError.getError().getErrors().stream()
              .map(ErrorProto::getMessage)
              .collect(Collectors.toList());
      String stackTrace = String.join("\nCaused by:", errorMessages);

      throw new IllegalStateException(
          String.format(
              "Failed to insert row %s.\nCaused by: %s", insertError.getRow(), stackTrace));
    }
  }

  /**
   * Create the {@link Write} transform that outputs the collection to BigQuery as per input option.
   */
  @VisibleForTesting
  static Write<TableRow> writeToBQTransform(JdbcToBigQueryOptions options) {
    // Needed for loading GCS filesystem before Pipeline.Create call
    FileSystems.setDefaultPipelineOptions(options);
    Write<TableRow> write =
        BigQueryIO.writeTableRows()
            .withoutValidation()
            .withCreateDisposition(Write.CreateDisposition.valueOf(options.getCreateDisposition()))
            .withWriteDisposition(
                options.getIsTruncate()
                    ? Write.WriteDisposition.WRITE_TRUNCATE
                    : Write.WriteDisposition.WRITE_APPEND)
            .withCustomGcsTempLocation(
                StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()))
            .withExtendedErrorInfo()
            .to(options.getOutputTable());

    if (Write.CreateDisposition.valueOf(options.getCreateDisposition())
        != Write.CreateDisposition.CREATE_NEVER) {
      write = write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
    }
    if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
      write = write.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
    }

    return write;
  }

  /**
   * Retrieves a secret value from SecretManagerUtils if the input string matches the specified
   * pattern.
   *
   * @param secret The input string representing a potential secret.
   * @return The secret value if the input matches the pattern and the secret is found, otherwise
   *     the original input string.
   */
  private static String maybeParseSecret(String secret) {
    // Check if the input string is not null.
    if (secret != null) {
      // Check if the input string matches the pattern for secrets stored in SecretManagerUtils.
      if (secret.matches("projects/.*/secrets/.*/versions/.*")) { // Use .* to match any characters
        // Retrieve the secret value from SecretManagerUtils.
        return SecretManagerUtils.getSecret(secret);
      }
    }
    // If the input is null or doesn't match the pattern, return the original input.
    return secret;
  }
}